blob: f2d374a2414c0c8e7b9d77d911ab56d9cfd940e4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.fnexecution.control;
import static org.apache.beam.runners.core.construction.SyntheticComponents.uniqueId;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.runners.core.construction.SyntheticComponents;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
import org.apache.beam.runners.core.construction.graph.SideInputReference;
import org.apache.beam.runners.core.construction.graph.TimerReference;
import org.apache.beam.runners.core.construction.graph.UserStateReference;
import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders;
import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableTable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.sdk.v2.sdk.extensions.protobuf.ByteStringCoder;
/** Utility methods for creating {@link ProcessBundleDescriptor} instances. */
// TODO: Rename to ExecutableStages?
public class ProcessBundleDescriptors {
/**
* Note that the {@link ProcessBundleDescriptor} is constructed by:
*
* <ul>
* <li>Adding gRPC read and write nodes wiring them to the specified data endpoint.
* <li>Setting the state {@link ApiServiceDescriptor} to the specified state endpoint.
* <li>Modifying the coder on PCollections that are accessed as side inputs to be length
* prefixed making them binary compatible with the coder chosen when that side input is
* materialized.
* </ul>
*/
public static ExecutableProcessBundleDescriptor fromExecutableStage(
String id,
ExecutableStage stage,
ApiServiceDescriptor dataEndpoint,
ApiServiceDescriptor stateEndpoint)
throws IOException {
checkState(id != null, "id must be specified.");
checkState(stage != null, "stage must be specified.");
checkState(dataEndpoint != null, "dataEndpoint must be specified.");
checkState(stateEndpoint != null, "stateEndpoint must be specified.");
return fromExecutableStageInternal(id, stage, dataEndpoint, stateEndpoint);
}
public static ExecutableProcessBundleDescriptor fromExecutableStage(
String id, ExecutableStage stage, ApiServiceDescriptor dataEndpoint) throws IOException {
checkState(id != null, "id must be specified.");
checkState(stage != null, "stage must be specified.");
checkState(dataEndpoint != null, "dateEndpoint must be specified.");
return fromExecutableStageInternal(id, stage, dataEndpoint, null);
}
private static ExecutableProcessBundleDescriptor fromExecutableStageInternal(
String id,
ExecutableStage stage,
ApiServiceDescriptor dataEndpoint,
@Nullable ApiServiceDescriptor stateEndpoint)
throws IOException {
// Create with all of the processing transforms, and all of the components.
// TODO: Remove the unreachable subcomponents if the size of the descriptor matters.
Map<String, PTransform> stageTransforms =
stage.getTransforms().stream()
.collect(Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform));
Components.Builder components =
stage.getComponents().toBuilder().clearTransforms().putAllTransforms(stageTransforms);
ImmutableMap.Builder<String, RemoteInputDestination> inputDestinationsBuilder =
ImmutableMap.builder();
ImmutableMap.Builder<String, Coder> remoteOutputCodersBuilder = ImmutableMap.builder();
// The order of these does not matter.
inputDestinationsBuilder.put(
stage.getInputPCollection().getId(),
addStageInput(dataEndpoint, stage.getInputPCollection(), components));
remoteOutputCodersBuilder.putAll(
addStageOutputs(dataEndpoint, stage.getOutputPCollections(), components));
Map<String, Map<String, SideInputSpec>> sideInputSpecs = addSideInputs(stage, components);
Map<String, Map<String, BagUserStateSpec>> bagUserStateSpecs =
forBagUserStates(stage, components.build());
Map<String, Map<String, TimerSpec>> timerSpecs =
forTimerSpecs(
dataEndpoint, stage, components, inputDestinationsBuilder, remoteOutputCodersBuilder);
if (bagUserStateSpecs.size() > 0 || timerSpecs.size() > 0) {
lengthPrefixKeyCoder(stage.getInputPCollection().getId(), components);
}
// Copy data from components to ProcessBundleDescriptor.
ProcessBundleDescriptor.Builder bundleDescriptorBuilder =
ProcessBundleDescriptor.newBuilder().setId(id);
if (stateEndpoint != null) {
bundleDescriptorBuilder.setStateApiServiceDescriptor(stateEndpoint);
}
bundleDescriptorBuilder
.putAllCoders(components.getCodersMap())
.putAllEnvironments(components.getEnvironmentsMap())
.putAllPcollections(components.getPcollectionsMap())
.putAllWindowingStrategies(components.getWindowingStrategiesMap())
.putAllTransforms(components.getTransformsMap());
return ExecutableProcessBundleDescriptor.of(
bundleDescriptorBuilder.build(),
inputDestinationsBuilder.build(),
remoteOutputCodersBuilder.build(),
sideInputSpecs,
bagUserStateSpecs,
timerSpecs);
}
/**
* Patches the input coder of a stateful transform to ensure that the byte representation of a key
* used to partition the input element at the Runner, matches the key byte representation received
* for state requests and timers from the SDK Harness. Stateful transforms always have a KvCoder
* as input.
*/
private static void lengthPrefixKeyCoder(
String inputColId, Components.Builder componentsBuilder) {
RunnerApi.PCollection pcollection = componentsBuilder.getPcollectionsOrThrow(inputColId);
RunnerApi.Coder kvCoder = componentsBuilder.getCodersOrThrow(pcollection.getCoderId());
Preconditions.checkState(
ModelCoders.KV_CODER_URN.equals(kvCoder.getSpec().getUrn()),
"Stateful executable stages must use a KV coder, but is: %s",
kvCoder.getSpec().getUrn());
String keyCoderId = ModelCoders.getKvCoderComponents(kvCoder).keyCoderId();
// Retain the original coder, but wrap in LengthPrefixCoder
String newKeyCoderId =
LengthPrefixUnknownCoders.addLengthPrefixedCoder(keyCoderId, componentsBuilder, false);
// Replace old key coder with LengthPrefixCoder<old_key_coder>
kvCoder = kvCoder.toBuilder().setComponentCoderIds(0, newKeyCoderId).build();
componentsBuilder.putCoders(pcollection.getCoderId(), kvCoder);
}
private static Map<String, Coder<WindowedValue<?>>> addStageOutputs(
ApiServiceDescriptor dataEndpoint,
Collection<PCollectionNode> outputPCollections,
Components.Builder components)
throws IOException {
Map<String, Coder<WindowedValue<?>>> remoteOutputCoders = new LinkedHashMap<>();
for (PCollectionNode outputPCollection : outputPCollections) {
OutputEncoding outputEncoding = addStageOutput(dataEndpoint, components, outputPCollection);
remoteOutputCoders.put(outputEncoding.getPTransformId(), outputEncoding.getCoder());
}
return remoteOutputCoders;
}
private static RemoteInputDestination<WindowedValue<?>> addStageInput(
ApiServiceDescriptor dataEndpoint,
PCollectionNode inputPCollection,
Components.Builder components)
throws IOException {
String inputWireCoderId = WireCoders.addSdkWireCoder(inputPCollection, components);
@SuppressWarnings("unchecked")
Coder<WindowedValue<?>> wireCoder =
(Coder) WireCoders.instantiateRunnerWireCoder(inputPCollection, components.build());
RemoteGrpcPort inputPort =
RemoteGrpcPort.newBuilder()
.setApiServiceDescriptor(dataEndpoint)
.setCoderId(inputWireCoderId)
.build();
String inputId =
uniqueId(
String.format("fn/read/%s", inputPCollection.getId()), components::containsTransforms);
PTransform inputTransform =
RemoteGrpcPortRead.readFromPort(inputPort, inputPCollection.getId()).toPTransform();
components.putTransforms(inputId, inputTransform);
return RemoteInputDestination.of(wireCoder, inputId);
}
private static OutputEncoding addStageOutput(
ApiServiceDescriptor dataEndpoint,
Components.Builder components,
PCollectionNode outputPCollection)
throws IOException {
String outputWireCoderId = WireCoders.addSdkWireCoder(outputPCollection, components);
@SuppressWarnings("unchecked")
Coder<WindowedValue<?>> wireCoder =
(Coder) WireCoders.instantiateRunnerWireCoder(outputPCollection, components.build());
RemoteGrpcPort outputPort =
RemoteGrpcPort.newBuilder()
.setApiServiceDescriptor(dataEndpoint)
.setCoderId(outputWireCoderId)
.build();
RemoteGrpcPortWrite outputWrite =
RemoteGrpcPortWrite.writeToPort(outputPCollection.getId(), outputPort);
String outputId =
uniqueId(
String.format("fn/write/%s", outputPCollection.getId()),
components::containsTransforms);
PTransform outputTransform = outputWrite.toPTransform();
components.putTransforms(outputId, outputTransform);
return new AutoValue_ProcessBundleDescriptors_OutputEncoding(outputId, wireCoder);
}
public static Map<String, Map<String, SideInputSpec>> getSideInputs(ExecutableStage stage)
throws IOException {
return addSideInputs(stage, stage.getComponents().toBuilder());
}
private static Map<String, Map<String, SideInputSpec>> addSideInputs(
ExecutableStage stage, Components.Builder components) throws IOException {
ImmutableTable.Builder<String, String, SideInputSpec> idsToSpec = ImmutableTable.builder();
for (SideInputReference sideInputReference : stage.getSideInputs()) {
// Update the coder specification for side inputs to be length prefixed so that the
// SDK and Runner agree on how to encode/decode the key, window, and values for
// side inputs.
PCollectionNode pcNode = sideInputReference.collection();
PCollection pc = pcNode.getPCollection();
String lengthPrefixedCoderId =
LengthPrefixUnknownCoders.addLengthPrefixedCoder(pc.getCoderId(), components, false);
components.putPcollections(
pcNode.getId(), pc.toBuilder().setCoderId(lengthPrefixedCoderId).build());
FullWindowedValueCoder<KV<?, ?>> coder =
(FullWindowedValueCoder)
WireCoders.instantiateRunnerWireCoder(pcNode, components.build());
idsToSpec.put(
sideInputReference.transform().getId(),
sideInputReference.localName(),
SideInputSpec.of(
sideInputReference.transform().getId(),
sideInputReference.localName(),
getAccessPattern(sideInputReference),
coder.getValueCoder(),
coder.getWindowCoder()));
}
return idsToSpec.build().rowMap();
}
private static RunnerApi.FunctionSpec getAccessPattern(SideInputReference sideInputReference) {
try {
return RunnerApi.ParDoPayload.parseFrom(
sideInputReference.transform().getTransform().getSpec().getPayload())
.getSideInputsMap()
.get(sideInputReference.localName())
.getAccessPattern();
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
private static Map<String, Map<String, BagUserStateSpec>> forBagUserStates(
ExecutableStage stage, Components components) throws IOException {
ImmutableTable.Builder<String, String, BagUserStateSpec> idsToSpec = ImmutableTable.builder();
for (UserStateReference userStateReference : stage.getUserStates()) {
FullWindowedValueCoder<KV<?, ?>> coder =
(FullWindowedValueCoder)
WireCoders.instantiateRunnerWireCoder(userStateReference.collection(), components);
idsToSpec.put(
userStateReference.transform().getId(),
userStateReference.localName(),
BagUserStateSpec.of(
userStateReference.transform().getId(),
userStateReference.localName(),
// We use the ByteString coder to save on encoding and decoding the actual key.
ByteStringCoder.of(),
// Usage of the ByteStringCoder provides a significant simplification for handling
// a logical stream of values by not needing to know where the element boundaries
// actually are. See StateRequestHandlers.java for further details.
ByteStringCoder.of(),
coder.getWindowCoder()));
}
return idsToSpec.build().rowMap();
}
private static Map<String, Map<String, TimerSpec>> forTimerSpecs(
ApiServiceDescriptor dataEndpoint,
ExecutableStage stage,
Components.Builder components,
ImmutableMap.Builder<String, RemoteInputDestination> remoteInputsBuilder,
ImmutableMap.Builder<String, Coder> outputTransformCodersBuilder)
throws IOException {
ImmutableTable.Builder<String, String, TimerSpec> idsToSpec = ImmutableTable.builder();
for (TimerReference timerReference : stage.getTimers()) {
RunnerApi.ParDoPayload payload =
RunnerApi.ParDoPayload.parseFrom(
timerReference.transform().getTransform().getSpec().getPayload());
RunnerApi.TimeDomain.Enum timeDomain =
payload.getTimerSpecsOrThrow(timerReference.localName()).getTimeDomain();
org.apache.beam.sdk.state.TimerSpec spec;
switch (timeDomain) {
case EVENT_TIME:
spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
break;
case PROCESSING_TIME:
spec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
break;
case SYNCHRONIZED_PROCESSING_TIME:
spec = TimerSpecs.timer(TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
break;
default:
throw new IllegalArgumentException(String.format("Unknown time domain %s", timeDomain));
}
String mainInputName =
timerReference
.transform()
.getTransform()
.getInputsOrThrow(
Iterables.getOnlyElement(
Sets.difference(
timerReference.transform().getTransform().getInputsMap().keySet(),
Sets.union(
payload.getSideInputsMap().keySet(),
payload.getTimerSpecsMap().keySet()))));
String timerCoderId =
keyValueCoderId(
components
.getCodersOrThrow(components.getPcollectionsOrThrow(mainInputName).getCoderId())
.getComponentCoderIds(0),
payload.getTimerSpecsOrThrow(timerReference.localName()).getTimerCoderId(),
components);
RunnerApi.PCollection timerCollectionSpec =
components
.getPcollectionsOrThrow(mainInputName)
.toBuilder()
.setCoderId(timerCoderId)
.build();
// "Unroll" the timers into PCollections.
String inputTimerPCollectionId =
SyntheticComponents.uniqueId(
String.format(
"%s.timer.%s.in", timerReference.transform().getId(), timerReference.localName()),
components.getPcollectionsMap()::containsKey);
components.putPcollections(inputTimerPCollectionId, timerCollectionSpec);
remoteInputsBuilder.put(
inputTimerPCollectionId,
addStageInput(
dataEndpoint,
PipelineNode.pCollection(inputTimerPCollectionId, timerCollectionSpec),
components));
String outputTimerPCollectionId =
SyntheticComponents.uniqueId(
String.format(
"%s.timer.%s.out",
timerReference.transform().getId(), timerReference.localName()),
components.getPcollectionsMap()::containsKey);
components.putPcollections(outputTimerPCollectionId, timerCollectionSpec);
OutputEncoding outputEncoding =
addStageOutput(
dataEndpoint,
components,
PipelineNode.pCollection(outputTimerPCollectionId, timerCollectionSpec));
outputTransformCodersBuilder.put(outputEncoding.getPTransformId(), outputEncoding.getCoder());
components.putTransforms(
timerReference.transform().getId(),
// Since a transform can have more then one timer, update the transform inside components
// and not the original
components
.getTransformsOrThrow(timerReference.transform().getId())
.toBuilder()
.putInputs(timerReference.localName(), inputTimerPCollectionId)
.putOutputs(timerReference.localName(), outputTimerPCollectionId)
.build());
idsToSpec.put(
timerReference.transform().getId(),
timerReference.localName(),
TimerSpec.of(
timerReference.transform().getId(),
timerReference.localName(),
inputTimerPCollectionId,
outputTimerPCollectionId,
outputEncoding.getPTransformId(),
spec));
}
return idsToSpec.build().rowMap();
}
private static String keyValueCoderId(
String keyCoderId, String valueCoderId, Components.Builder components) {
String id =
uniqueId(
String.format("kv-%s-%s", keyCoderId, valueCoderId),
components.getCodersMap()::containsKey);
RunnerApi.Coder.Builder coder;
components.putCoders(
id,
RunnerApi.Coder.newBuilder()
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.KV_CODER_URN))
.addComponentCoderIds(keyCoderId)
.addComponentCoderIds(valueCoderId)
.build());
return id;
}
@AutoValue
abstract static class OutputEncoding {
abstract String getPTransformId();
abstract Coder<WindowedValue<?>> getCoder();
}
/**
* A container type storing references to the key, value, and window {@link Coder} used when
* handling side input state requests.
*/
@AutoValue
public abstract static class SideInputSpec<K, T, W extends BoundedWindow> {
public static <T, W extends BoundedWindow> SideInputSpec of(
String transformId,
String sideInputId,
RunnerApi.FunctionSpec accessPattern,
Coder<T> elementCoder,
Coder<W> windowCoder) {
return new AutoValue_ProcessBundleDescriptors_SideInputSpec(
transformId, sideInputId, accessPattern, elementCoder, windowCoder);
}
public abstract String transformId();
public abstract String sideInputId();
public abstract RunnerApi.FunctionSpec accessPattern();
public abstract Coder<T> elementCoder();
public abstract Coder<W> windowCoder();
}
/**
* A container type storing references to the key, value, and window {@link Coder} used when
* handling bag user state requests.
*/
@AutoValue
public abstract static class BagUserStateSpec<K, V, W extends BoundedWindow> {
static <K, V, W extends BoundedWindow> BagUserStateSpec<K, V, W> of(
String transformId,
String userStateId,
Coder<K> keyCoder,
Coder<V> valueCoder,
Coder<W> windowCoder) {
return new AutoValue_ProcessBundleDescriptors_BagUserStateSpec(
transformId, userStateId, keyCoder, valueCoder, windowCoder);
}
public abstract String transformId();
public abstract String userStateId();
public abstract Coder<K> keyCoder();
public abstract Coder<V> valueCoder();
public abstract Coder<W> windowCoder();
}
/**
* A container type storing references to the key, timer and payload coders and the remote input
* destination used when handling timer requests.
*/
@AutoValue
public abstract static class TimerSpec<K, V, W extends BoundedWindow> {
static <K, V, W extends BoundedWindow> TimerSpec<K, V, W> of(
String transformId,
String timerId,
String inputCollectionId,
String outputCollectionId,
String outputTransformId,
org.apache.beam.sdk.state.TimerSpec timerSpec) {
return new AutoValue_ProcessBundleDescriptors_TimerSpec(
transformId,
timerId,
inputCollectionId,
outputCollectionId,
outputTransformId,
timerSpec);
}
public abstract String transformId();
public abstract String timerId();
public abstract String inputCollectionId();
public abstract String outputCollectionId();
public abstract String outputTransformId();
public abstract org.apache.beam.sdk.state.TimerSpec getTimerSpec();
}
/** */
@AutoValue
public abstract static class ExecutableProcessBundleDescriptor {
public static ExecutableProcessBundleDescriptor of(
ProcessBundleDescriptor descriptor,
Map<String, RemoteInputDestination> inputDestinations,
Map<String, Coder> outputTransformCoders,
Map<String, Map<String, SideInputSpec>> sideInputSpecs,
Map<String, Map<String, BagUserStateSpec>> bagUserStateSpecs,
Map<String, Map<String, TimerSpec>> timerSpecs) {
ImmutableTable.Builder copyOfSideInputSpecs = ImmutableTable.builder();
for (Map.Entry<String, Map<String, SideInputSpec>> outer : sideInputSpecs.entrySet()) {
for (Map.Entry<String, SideInputSpec> inner : outer.getValue().entrySet()) {
copyOfSideInputSpecs.put(outer.getKey(), inner.getKey(), inner.getValue());
}
}
ImmutableTable.Builder copyOfBagUserStateSpecs = ImmutableTable.builder();
for (Map.Entry<String, Map<String, BagUserStateSpec>> outer : bagUserStateSpecs.entrySet()) {
for (Map.Entry<String, BagUserStateSpec> inner : outer.getValue().entrySet()) {
copyOfBagUserStateSpecs.put(outer.getKey(), inner.getKey(), inner.getValue());
}
}
ImmutableTable.Builder copyOfTimerSpecs = ImmutableTable.builder();
for (Map.Entry<String, Map<String, TimerSpec>> outer : timerSpecs.entrySet()) {
for (Map.Entry<String, TimerSpec> inner : outer.getValue().entrySet()) {
copyOfTimerSpecs.put(outer.getKey(), inner.getKey(), inner.getValue());
}
}
return new AutoValue_ProcessBundleDescriptors_ExecutableProcessBundleDescriptor(
descriptor,
inputDestinations,
Collections.unmodifiableMap(outputTransformCoders),
copyOfSideInputSpecs.build().rowMap(),
copyOfBagUserStateSpecs.build().rowMap(),
copyOfTimerSpecs.build().rowMap());
}
public abstract ProcessBundleDescriptor getProcessBundleDescriptor();
/**
* Get {@link RemoteInputDestination}s that input data/timers are sent to the {@link
* ProcessBundleDescriptor} over.
*/
public abstract Map<String, RemoteInputDestination> getRemoteInputDestinations();
/**
* Get all of the transforms materialized by this {@link ExecutableProcessBundleDescriptor} and
* the Java {@link Coder} for the wire format of that transform.
*/
public abstract Map<String, Coder> getRemoteOutputCoders();
/**
* Get a mapping from PTransform id to side input id to {@link SideInputSpec side inputs} that
* are used during execution.
*/
public abstract Map<String, Map<String, SideInputSpec>> getSideInputSpecs();
/**
* Get a mapping from PTransform id to user state input id to {@link BagUserStateSpec bag user
* states} that are used during execution.
*/
public abstract Map<String, Map<String, BagUserStateSpec>> getBagUserStateSpecs();
/**
* Get a mapping from PTransform id to timer id to {@link TimerSpec timer specs} that are used
* during execution.
*/
public abstract Map<String, Map<String, TimerSpec>> getTimerSpecs();
}
}