blob: 28f87891c7ce32c5215af234f24ac6550cc386ad [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct.portable;
import java.util.ArrayList;
import java.util.Collection;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.splittabledofn.SDFFeederViaStateAndTimers;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
/**
* The {@link TransformEvaluatorFactory} for {@link #URN}, which reads from a {@link
* DirectGroupByKey#DIRECT_GBKO_URN} and feeds the data, using state and timers, to a {@link
* ExecutableStage} whose first instruction is an SDF.
*/
class SplittableRemoteStageEvaluatorFactory implements TransformEvaluatorFactory {
public static final String URN = "beam:directrunner:transforms:splittable_remote_stage:v1";
// A fictional transform that transforms from KWI<unique key, KV<element, restriction>>
// to simply KV<element, restriction> taken by the SDF inside the ExecutableStage.
public static final String FEED_SDF_URN = "beam:directrunner:transforms:feed_sdf:v1";
private final BundleFactory bundleFactory;
private final JobBundleFactory jobBundleFactory;
private final StepStateAndTimers.Provider stp;
SplittableRemoteStageEvaluatorFactory(
BundleFactory bundleFactory,
JobBundleFactory jobBundleFactory,
StepStateAndTimers.Provider stepStateAndTimers) {
this.bundleFactory = bundleFactory;
this.jobBundleFactory = jobBundleFactory;
this.stp = stepStateAndTimers;
}
@Nullable
@Override
public <InputT> TransformEvaluator<InputT> forApplication(
PTransformNode application, CommittedBundle<?> inputBundle) throws Exception {
return new SplittableRemoteStageEvaluator(
bundleFactory,
jobBundleFactory,
stp.forStepAndKey(application, inputBundle.getKey()),
application);
}
@Override
public void cleanup() throws Exception {
jobBundleFactory.close();
}
private static class SplittableRemoteStageEvaluator<InputT, RestrictionT>
implements TransformEvaluator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> {
private final PTransformNode transform;
private final ExecutableStage stage;
private final CopyOnAccessInMemoryStateInternals<byte[]> stateInternals;
private final DirectTimerInternals timerInternals;
private final RemoteBundle bundle;
private final FnDataReceiver<WindowedValue<?>> mainInput;
private final Collection<UncommittedBundle<?>> outputs;
private final SDFFeederViaStateAndTimers<InputT, RestrictionT> feeder;
private SplittableRemoteStageEvaluator(
BundleFactory bundleFactory,
JobBundleFactory jobBundleFactory,
StepStateAndTimers<byte[]> stp,
PTransformNode transform)
throws Exception {
this.stateInternals = stp.stateInternals();
this.timerInternals = stp.timerInternals();
this.transform = transform;
this.stage =
ExecutableStage.fromPayload(
ExecutableStagePayload.parseFrom(transform.getTransform().getSpec().getPayload()));
this.outputs = new ArrayList<>();
FullWindowedValueCoder<KV<InputT, RestrictionT>> windowedValueCoder =
(FullWindowedValueCoder<KV<InputT, RestrictionT>>)
WireCoders.<KV<InputT, RestrictionT>>instantiateRunnerWireCoder(
stage.getInputPCollection(), stage.getComponents());
KvCoder<InputT, RestrictionT> kvCoder =
(KvCoder<InputT, RestrictionT>) windowedValueCoder.getValueCoder();
this.feeder =
new SDFFeederViaStateAndTimers<>(
stateInternals,
timerInternals,
kvCoder.getKeyCoder(),
kvCoder.getValueCoder(),
(Coder<BoundedWindow>) windowedValueCoder.getWindowCoder());
this.bundle =
jobBundleFactory
.forStage(stage)
.getBundle(
BundleFactoryOutputReceiverFactory.create(
bundleFactory, stage.getComponents(), outputs::add),
StateRequestHandler.unsupported(),
// TODO: Wire in splitting via a split listener
new BundleProgressHandler() {
@Override
public void onProgress(ProcessBundleProgressResponse progress) {}
@Override
public void onCompleted(ProcessBundleResponse response) {}
});
this.mainInput = Iterables.getOnlyElement(bundle.getInputReceivers().values());
}
@Override
public void processElement(
WindowedValue<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> windowedWorkItem)
throws Exception {
KeyedWorkItem<byte[], KV<InputT, RestrictionT>> kwi = windowedWorkItem.getValue();
WindowedValue<KV<InputT, RestrictionT>> elementRestriction =
Iterables.getOnlyElement(kwi.elementsIterable(), null);
if (elementRestriction != null) {
feeder.seed(elementRestriction);
} else {
elementRestriction = feeder.resume(Iterables.getOnlyElement(kwi.timersIterable()));
}
mainInput.accept(elementRestriction);
}
@Override
public TransformResult<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> finishBundle()
throws Exception {
bundle.close();
feeder.commit();
CopyOnAccessInMemoryStateInternals<byte[]> state = stateInternals.commit();
StepTransformResult.Builder<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>> result =
StepTransformResult.withHold(transform, state.getEarliestWatermarkHold());
return result
.addOutput(outputs)
.withState(state)
.withTimerUpdate(timerInternals.getTimerUpdate())
.build();
}
}
}