blob: 1d29402b26394f2e86f37422f26dab674d871164 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.compiler.frontend.beam.transform;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.punctuation.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* DoFn transform implementation when there is no side input.
*
* @param <InputT> input type.
* @param <OutputT> output type.
*/
public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<InputT, InputT, OutputT> {
private static final Logger LOG = LoggerFactory.getLogger(DoFnTransform.class.getName());
/**
* DoFnTransform Constructor.
*
* @param doFn doFn
* @param inputCoder input coder
* @param outputCoders output coders
* @param mainOutputTag main output tag
* @param additionalOutputTags additional output tags
* @param windowingStrategy windowing strategy
* @param options pipeline options
* @param displayData display data.
* @param doFnSchemaInformation doFn schema information.
* @param sideInputMapping side input mapping.
*/
public DoFnTransform(final DoFn<InputT, OutputT> doFn,
final Coder<InputT> inputCoder,
final Map<TupleTag<?>, Coder<?>> outputCoders,
final TupleTag<OutputT> mainOutputTag,
final List<TupleTag<?>> additionalOutputTags,
final WindowingStrategy<?, ?> windowingStrategy,
final PipelineOptions options,
final DisplayData displayData,
final DoFnSchemaInformation doFnSchemaInformation,
final Map<String, PCollectionView<?>> sideInputMapping) {
super(doFn, inputCoder, outputCoders, mainOutputTag, additionalOutputTags, windowingStrategy,
Collections.emptyMap(), options, displayData, doFnSchemaInformation, sideInputMapping);
}
@Override
protected DoFn wrapDoFn(final DoFn initDoFn) {
return initDoFn;
}
@Override
public void onData(final WindowedValue<InputT> data) {
// Do not need any push-back logic.
checkAndInvokeBundle();
getDoFnRunner().processElement(data);
checkAndFinishBundle();
}
@Override
public void onWatermark(final Watermark watermark) {
checkAndInvokeBundle();
getOutputCollector().emitWatermark(watermark);
checkAndFinishBundle();
}
@Override
protected void beforeClose() {
}
@Override
OutputCollector wrapOutputCollector(final OutputCollector oc) {
return oc;
}
}