| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.apex.translation.operators; |
| |
| import com.datatorrent.api.DefaultInputPort; |
| import com.datatorrent.api.DefaultOutputPort; |
| import com.datatorrent.api.annotation.OutputPortFieldAnnotation; |
| import com.datatorrent.common.util.BaseOperator; |
| import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; |
| import com.esotericsoftware.kryo.serializers.JavaSerializer; |
| import java.io.Serializable; |
| import java.util.Collection; |
| import java.util.Collections; |
| import org.apache.beam.runners.apex.ApexPipelineOptions; |
| import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; |
| import org.apache.beam.runners.core.KeyedWorkItem; |
| import org.apache.beam.runners.core.KeyedWorkItems; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.transforms.windowing.WindowFn; |
| import org.apache.beam.sdk.util.WindowedValue; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; |
| import org.joda.time.Instant; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** Apex operator for simple native map operations. */ |
| public class ApexProcessFnOperator<InputT> extends BaseOperator { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ApexProcessFnOperator.class); |
| private boolean traceTuples = false; |
| |
| @Bind(JavaSerializer.class) |
| private final ApexOperatorFn<InputT> fn; |
| |
| public ApexProcessFnOperator(ApexOperatorFn<InputT> fn, boolean traceTuples) { |
| super(); |
| this.traceTuples = traceTuples; |
| this.fn = fn; |
| } |
| |
| @SuppressWarnings("unused") |
| private ApexProcessFnOperator() { |
| // for Kryo |
| fn = null; |
| } |
| |
| private final transient OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter = |
| new OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>>() { |
| @Override |
| public void emit(ApexStreamTuple<? extends WindowedValue<?>> tuple) { |
| if (traceTuples) { |
| LOG.debug("\nemitting {}\n", tuple); |
| } |
| outputPort.emit(tuple); |
| } |
| }; |
| |
| /** Something that emits results. */ |
| public interface OutputEmitter<T> { |
| void emit(T tuple); |
| } |
| |
| /** The processing logic for this operator. */ |
| public interface ApexOperatorFn<InputT> extends Serializable { |
| void process( |
| ApexStreamTuple<WindowedValue<InputT>> input, |
| OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) |
| throws Exception; |
| } |
| |
| /** Convert {@link KV} into {@link KeyedWorkItem}s. */ |
| public static <K, V> ApexProcessFnOperator<KV<K, V>> toKeyedWorkItems( |
| ApexPipelineOptions options) { |
| ApexOperatorFn<KV<K, V>> fn = new ToKeyedWorkItems<>(); |
| return new ApexProcessFnOperator<>(fn, options.isTupleTracingEnabled()); |
| } |
| |
| private static class ToKeyedWorkItems<K, V> implements ApexOperatorFn<KV<K, V>> { |
| @Override |
| public final void process( |
| ApexStreamTuple<WindowedValue<KV<K, V>>> tuple, |
| OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) { |
| |
| if (tuple instanceof ApexStreamTuple.WatermarkTuple) { |
| outputEmitter.emit(tuple); |
| } else { |
| for (WindowedValue<KV<K, V>> in : tuple.getValue().explodeWindows()) { |
| KeyedWorkItem<K, V> kwi = |
| KeyedWorkItems.elementsWorkItem( |
| in.getValue().getKey(), |
| Collections.singletonList(in.withValue(in.getValue().getValue()))); |
| outputEmitter.emit(ApexStreamTuple.DataTuple.of(in.withValue(kwi))); |
| } |
| } |
| } |
| } |
| |
| public static <T, W extends BoundedWindow> ApexProcessFnOperator<T> assignWindows( |
| WindowFn<T, W> windowFn, ApexPipelineOptions options) { |
| ApexOperatorFn<T> fn = new AssignWindows<>(windowFn); |
| return new ApexProcessFnOperator<>(fn, options.isTupleTracingEnabled()); |
| } |
| |
| /** Function for implementing {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}. */ |
| private static class AssignWindows<T, W extends BoundedWindow> implements ApexOperatorFn<T> { |
| private final WindowFn<T, W> windowFn; |
| |
| private AssignWindows(WindowFn<T, W> windowFn) { |
| this.windowFn = windowFn; |
| } |
| |
| @Override |
| public final void process( |
| ApexStreamTuple<WindowedValue<T>> tuple, |
| OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) |
| throws Exception { |
| if (tuple instanceof ApexStreamTuple.WatermarkTuple) { |
| outputEmitter.emit(tuple); |
| } else { |
| final WindowedValue<T> input = tuple.getValue(); |
| Collection<W> windows = |
| windowFn.assignWindows( |
| windowFn.new AssignContext() { |
| @Override |
| public T element() { |
| return input.getValue(); |
| } |
| |
| @Override |
| public Instant timestamp() { |
| return input.getTimestamp(); |
| } |
| |
| @Override |
| public BoundedWindow window() { |
| return Iterables.getOnlyElement(input.getWindows()); |
| } |
| }); |
| for (W w : windows) { |
| WindowedValue<T> wv = |
| WindowedValue.of(input.getValue(), input.getTimestamp(), w, input.getPane()); |
| outputEmitter.emit(ApexStreamTuple.DataTuple.of(wv)); |
| } |
| } |
| } |
| } |
| |
| /** Input port. */ |
| public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> inputPort = |
| new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() { |
| @Override |
| public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) { |
| try { |
| fn.process(tuple, outputEmitter); |
| } catch (Exception e) { |
| Throwables.throwIfUnchecked(e); |
| throw new RuntimeException(e); |
| } |
| } |
| }; |
| |
| /** Output port. */ |
| @OutputPortFieldAnnotation(optional = true) |
| public final transient DefaultOutputPort<ApexStreamTuple<? extends WindowedValue<?>>> outputPort = |
| new DefaultOutputPort<>(); |
| } |