blob: 6b91bcb14d252273452774c786c7f7c3d47dc622 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.apex.translation.operators;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Apex operator for simple native map operations. */
public class ApexProcessFnOperator<InputT> extends BaseOperator {
private static final Logger LOG = LoggerFactory.getLogger(ApexProcessFnOperator.class);
private boolean traceTuples = false;
@Bind(JavaSerializer.class)
private final ApexOperatorFn<InputT> fn;
public ApexProcessFnOperator(ApexOperatorFn<InputT> fn, boolean traceTuples) {
super();
this.traceTuples = traceTuples;
this.fn = fn;
}
@SuppressWarnings("unused")
private ApexProcessFnOperator() {
// for Kryo
fn = null;
}
private final transient OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter =
new OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>>() {
@Override
public void emit(ApexStreamTuple<? extends WindowedValue<?>> tuple) {
if (traceTuples) {
LOG.debug("\nemitting {}\n", tuple);
}
outputPort.emit(tuple);
}
};
/** Something that emits results. */
public interface OutputEmitter<T> {
void emit(T tuple);
}
/** The processing logic for this operator. */
public interface ApexOperatorFn<InputT> extends Serializable {
void process(
ApexStreamTuple<WindowedValue<InputT>> input,
OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter)
throws Exception;
}
/** Convert {@link KV} into {@link KeyedWorkItem}s. */
public static <K, V> ApexProcessFnOperator<KV<K, V>> toKeyedWorkItems(
ApexPipelineOptions options) {
ApexOperatorFn<KV<K, V>> fn = new ToKeyedWorkItems<>();
return new ApexProcessFnOperator<>(fn, options.isTupleTracingEnabled());
}
private static class ToKeyedWorkItems<K, V> implements ApexOperatorFn<KV<K, V>> {
@Override
public final void process(
ApexStreamTuple<WindowedValue<KV<K, V>>> tuple,
OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter) {
if (tuple instanceof ApexStreamTuple.WatermarkTuple) {
outputEmitter.emit(tuple);
} else {
for (WindowedValue<KV<K, V>> in : tuple.getValue().explodeWindows()) {
KeyedWorkItem<K, V> kwi =
KeyedWorkItems.elementsWorkItem(
in.getValue().getKey(),
Collections.singletonList(in.withValue(in.getValue().getValue())));
outputEmitter.emit(ApexStreamTuple.DataTuple.of(in.withValue(kwi)));
}
}
}
}
public static <T, W extends BoundedWindow> ApexProcessFnOperator<T> assignWindows(
WindowFn<T, W> windowFn, ApexPipelineOptions options) {
ApexOperatorFn<T> fn = new AssignWindows<>(windowFn);
return new ApexProcessFnOperator<>(fn, options.isTupleTracingEnabled());
}
/** Function for implementing {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}. */
private static class AssignWindows<T, W extends BoundedWindow> implements ApexOperatorFn<T> {
private final WindowFn<T, W> windowFn;
private AssignWindows(WindowFn<T, W> windowFn) {
this.windowFn = windowFn;
}
@Override
public final void process(
ApexStreamTuple<WindowedValue<T>> tuple,
OutputEmitter<ApexStreamTuple<? extends WindowedValue<?>>> outputEmitter)
throws Exception {
if (tuple instanceof ApexStreamTuple.WatermarkTuple) {
outputEmitter.emit(tuple);
} else {
final WindowedValue<T> input = tuple.getValue();
Collection<W> windows =
windowFn.assignWindows(
windowFn.new AssignContext() {
@Override
public T element() {
return input.getValue();
}
@Override
public Instant timestamp() {
return input.getTimestamp();
}
@Override
public BoundedWindow window() {
return Iterables.getOnlyElement(input.getWindows());
}
});
for (W w : windows) {
WindowedValue<T> wv =
WindowedValue.of(input.getValue(), input.getTimestamp(), w, input.getPane());
outputEmitter.emit(ApexStreamTuple.DataTuple.of(wv));
}
}
}
}
/** Input port. */
public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> inputPort =
new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() {
@Override
public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) {
try {
fn.process(tuple, outputEmitter);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
};
/** Output port. */
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<ApexStreamTuple<? extends WindowedValue<?>>> outputPort =
new DefaultOutputPort<>();
}