blob: 113928262cc356cf514dc074f7486f6c675e1e87 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.compiler.frontend.beam.transform;
import org.apache.beam.runners.core.*;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.compiler.frontend.beam.InMemorySideInputReader;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* This is a base class for Beam DoFn Transforms.
*
* @param <InputT> input type.
* @param <InterT> intermediate type.
* @param <OutputT> output type.
*/
public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
Transform<WindowedValue<InputT>, WindowedValue<OutputT>> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractDoFnTransform.class.getName());
private final TupleTag<OutputT> mainOutputTag;
private final List<TupleTag<?>> additionalOutputTags;
private final Map<Integer, PCollectionView<?>> sideInputs;
private final WindowingStrategy<?, ?> windowingStrategy;
private final DoFn<InterT, OutputT> doFn;
private final SerializablePipelineOptions serializedOptions;
private final Coder<InputT> inputCoder;
private final Map<TupleTag<?>, Coder<?>> outputCoders;
private transient OutputCollector<WindowedValue<OutputT>> outputCollector;
private transient DoFnRunner<InterT, OutputT> doFnRunner;
// null when there is no side input.
private transient PushbackSideInputDoFnRunner<InterT, OutputT> pushBackRunner;
private transient DoFnInvoker<InterT, OutputT> doFnInvoker;
private transient DoFnRunners.OutputManager outputManager;
private transient InMemorySideInputReader sideInputReader;
// Variables for bundle.
// We consider count and time millis for start/finish bundle.
// If # of processed elements > bundleSize
// or elapsed time > bundleMillis, we finish the current bundle and start a new one
private transient long bundleSize;
private transient long bundleMillis;
private long prevBundleStartTime;
private long currBundleCount = 0;
private boolean bundleFinished = true;
private final DisplayData displayData;
/**
* AbstractDoFnTransform constructor.
*
* @param doFn doFn
* @param inputCoder input coder
* @param outputCoders output coders
* @param mainOutputTag main output tag
* @param additionalOutputTags additional output tags
* @param windowingStrategy windowing strategy
* @param sideInputs side inputs
* @param options pipeline options
* @param displayData display data.
*/
public AbstractDoFnTransform(final DoFn<InterT, OutputT> doFn,
final Coder<InputT> inputCoder,
final Map<TupleTag<?>, Coder<?>> outputCoders,
final TupleTag<OutputT> mainOutputTag,
final List<TupleTag<?>> additionalOutputTags,
final WindowingStrategy<?, ?> windowingStrategy,
final Map<Integer, PCollectionView<?>> sideInputs,
final PipelineOptions options,
final DisplayData displayData) {
this.doFn = doFn;
this.inputCoder = inputCoder;
this.outputCoders = outputCoders;
this.mainOutputTag = mainOutputTag;
this.additionalOutputTags = additionalOutputTags;
this.sideInputs = sideInputs;
this.serializedOptions = new SerializablePipelineOptions(options);
this.windowingStrategy = windowingStrategy;
this.displayData = displayData;
}
/**
* Getter function for side inputs.
*
* @return the side inputs
*/
final Map<Integer, PCollectionView<?>> getSideInputs() {
return sideInputs;
}
/**
* Getter function for output manager.
*
* @return the output manager.
*/
final DoFnRunners.OutputManager getOutputManager() {
return outputManager;
}
/**
* Getter function for windowing strategy.
*
* @return the windowing strategy.
*/
final WindowingStrategy getWindowingStrategy() {
return windowingStrategy;
}
/**
* Getter function for output tag.
*
* @return main output tag.
*/
final TupleTag<OutputT> getMainOutputTag() {
return mainOutputTag;
}
/**
* Getter function for DoFn runner.
*
* @return DoFn runner.
*/
final DoFnRunner<InterT, OutputT> getDoFnRunner() {
return doFnRunner;
}
/**
* Getter function for push back runner.
*
* @return push back runner.
*/
final PushbackSideInputDoFnRunner<InterT, OutputT> getPushBackRunner() {
return pushBackRunner;
}
/**
* Getter function for side input reader.
*
* @return side input reader.
*/
final InMemorySideInputReader getSideInputReader() {
return sideInputReader;
}
/**
* Getter function for DoFn.
*
* @return DoFn.
*/
public final DoFn getDoFn() {
return doFn;
}
/**
* Checks whether the bundle is finished or not.
* Starts the bundle if it is done.
* <p>
* TODO #263: Partial Combining for Beam Streaming
* We may want to use separate methods for doFnRunner/pushBackRunner
* (same applies to the other bundle-related methods)
*/
final void checkAndInvokeBundle() {
if (bundleFinished) {
bundleFinished = false;
if (pushBackRunner == null) {
doFnRunner.startBundle();
} else {
pushBackRunner.startBundle();
}
prevBundleStartTime = System.currentTimeMillis();
currBundleCount = 0;
}
currBundleCount += 1;
}
/**
* Checks whether it is time to finish the bundle and finish it.
*/
final void checkAndFinishBundle() {
if (!bundleFinished) {
if (currBundleCount >= bundleSize || System.currentTimeMillis() - prevBundleStartTime >= bundleMillis) {
bundleFinished = true;
if (pushBackRunner == null) {
doFnRunner.finishBundle();
} else {
pushBackRunner.finishBundle();
}
}
}
}
/**
* Finish bundle without checking for conditions.
*/
final void forceFinishBundle() {
if (!bundleFinished) {
bundleFinished = true;
if (pushBackRunner == null) {
doFnRunner.finishBundle();
} else {
pushBackRunner.finishBundle();
}
}
}
@Override
public final void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>> oc) {
// deserialize pipeline option
final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class);
this.outputCollector = wrapOutputCollector(oc);
this.bundleMillis = options.getMaxBundleTimeMills();
this.bundleSize = options.getMaxBundleSize();
// create output manager
outputManager = new DefaultOutputManager<>(outputCollector, mainOutputTag);
// create side input reader
sideInputReader = new InMemorySideInputReader(new ArrayList<>(sideInputs.values()));
// this transform does not support state and timer.
final StepContext stepContext = new StepContext() {
@Override
public StateInternals stateInternals() {
throw new UnsupportedOperationException("Not support stateInternals in DoFnTransform");
}
@Override
public TimerInternals timerInternals() {
throw new UnsupportedOperationException("Not support timerInternals in DoFnTransform");
}
};
final DoFn wrappedDoFn = wrapDoFn(doFn);
// invoker
doFnInvoker = DoFnInvokers.invokerFor(wrappedDoFn);
doFnInvoker.invokeSetup();
// DoFnRunners.simpleRunner takes care of all the hard stuff of running the DoFn
// and that this approach is the standard used by most of the Beam runners
doFnRunner = DoFnRunners.simpleRunner(
options,
wrappedDoFn,
sideInputReader,
outputManager,
mainOutputTag,
additionalOutputTags,
stepContext,
inputCoder,
outputCoders,
windowingStrategy);
pushBackRunner = sideInputs.isEmpty()
? null
: SimplePushbackSideInputDoFnRunner.<InterT, OutputT>create(doFnRunner, sideInputs.values(), sideInputReader);
}
/**
* Getter function for output collector.
*
* @return output collector.
*/
public final OutputCollector<WindowedValue<OutputT>> getOutputCollector() {
return outputCollector;
}
@Override
public final void close() {
beforeClose();
forceFinishBundle();
doFnInvoker.invokeTeardown();
}
@Override
public final String toString() {
return this.getClass().getSimpleName() + " / " + displayData.toString().replace(":", " / ");
}
/**
* An abstract function that wraps the original doFn.
*
* @param originalDoFn the original doFn.
* @return wrapped doFn.
*/
abstract DoFn wrapDoFn(final DoFn originalDoFn);
/**
* An abstract function that wraps the original output collector.
*
* @param oc the original outputCollector.
* @return wrapped output collector.
*/
abstract OutputCollector wrapOutputCollector(final OutputCollector oc);
/**
* An abstract function that is called before close.
*/
abstract void beforeClose();
}