blob: 540c08d2c34c6edbc7a0785ac521b2f459948cb7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Duration;
import org.joda.time.Instant;
/**
* {@link PTransform PTransforms} for converting between explicit and implicit form of various Beam
* values.
*/
public class Reify {
private static class ReifyView<K, V> extends PTransform<PCollection<K>, PCollection<KV<K, V>>> {
private final PCollectionView<V> view;
private final Coder<V> coder;
private ReifyView(PCollectionView<V> view, Coder<V> coder) {
this.view = view;
this.coder = coder;
}
@Override
public PCollection<KV<K, V>> expand(PCollection<K> input) {
return input
.apply(
ParDo.of(
new DoFn<K, KV<K, V>>() {
@ProcessElement
public void process(ProcessContext c) {
c.output(KV.of(c.element(), c.sideInput(view)));
}
})
.withSideInputs(view))
.setCoder(KvCoder.of(input.getCoder(), coder));
}
}
private static class ReifyViewInGlobalWindow<V> extends PTransform<PBegin, PCollection<V>> {
private final PCollectionView<V> view;
private final Coder<V> coder;
private ReifyViewInGlobalWindow(PCollectionView<V> view, Coder<V> coder) {
this.view = view;
this.coder = coder;
}
@Override
public PCollection<V> expand(PBegin input) {
return input
.apply(Create.of((Void) null).withCoder(VoidCoder.of()))
.apply(Reify.viewAsValues(view, coder))
.apply(Values.create());
}
}
/** Private implementation of {@link #windows()}. */
private static class Window<T>
extends PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> {
@Override
public PCollection<ValueInSingleWindow<T>> expand(PCollection<T> input) {
return input
.apply(
ParDo.of(
new DoFn<T, ValueInSingleWindow<T>>() {
@ProcessElement
public void processElement(
@Element T element,
@Timestamp Instant timestamp,
BoundedWindow window,
PaneInfo pane,
OutputReceiver<ValueInSingleWindow<T>> r) {
r.outputWithTimestamp(
ValueInSingleWindow.of(element, timestamp, window, pane), timestamp);
}
}))
.setCoder(
ValueInSingleWindow.Coder.of(
input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()));
}
}
private static class Timestamp<T>
extends PTransform<PCollection<T>, PCollection<TimestampedValue<T>>> {
@Override
public PCollection<TimestampedValue<T>> expand(PCollection<T> input) {
return input
.apply(
ParDo.of(
new DoFn<T, TimestampedValue<T>>() {
@ProcessElement
public void processElement(
@Element T element,
@Timestamp Instant timestamp,
OutputReceiver<TimestampedValue<T>> r) {
r.output(TimestampedValue.of(element, timestamp));
}
}))
.setCoder(TimestampedValueCoder.of(input.getCoder()));
}
}
private static class WindowInValue<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, ValueInSingleWindow<V>>>> {
@Override
public PCollection<KV<K, ValueInSingleWindow<V>>> expand(PCollection<KV<K, V>> input) {
KvCoder<K, V> coder = (KvCoder<K, V>) input.getCoder();
return input
.apply(
ParDo.of(
new DoFn<KV<K, V>, KV<K, ValueInSingleWindow<V>>>() {
@ProcessElement
public void processElement(
@Element KV<K, V> element,
@Timestamp Instant timestamp,
BoundedWindow window,
PaneInfo pane,
OutputReceiver<KV<K, ValueInSingleWindow<V>>> r) {
r.output(
KV.of(
element.getKey(),
ValueInSingleWindow.of(element.getValue(), timestamp, window, pane)));
}
}))
.setCoder(
KvCoder.of(
coder.getKeyCoder(),
ValueInSingleWindow.Coder.of(
coder.getValueCoder(),
input.getWindowingStrategy().getWindowFn().windowCoder())));
}
}
private static class TimestampInValue<K, V>
extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, TimestampedValue<V>>>> {
@Override
public PCollection<KV<K, TimestampedValue<V>>> expand(PCollection<KV<K, V>> input) {
KvCoder<K, V> coder = (KvCoder<K, V>) input.getCoder();
return input
.apply(
ParDo.of(
new DoFn<KV<K, V>, KV<K, TimestampedValue<V>>>() {
@ProcessElement
public void processElement(
@Element KV<K, V> element,
@Timestamp Instant timestamp,
OutputReceiver<KV<K, TimestampedValue<V>>> r) {
r.output(
KV.of(
element.getKey(),
TimestampedValue.of(element.getValue(), timestamp)));
}
}))
.setCoder(
KvCoder.of(coder.getKeyCoder(), TimestampedValueCoder.of(coder.getValueCoder())));
}
}
private static class ExtractTimestampsFromValues<K, V>
extends PTransform<PCollection<KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>> {
@Override
public PCollection<KV<K, V>> expand(PCollection<KV<K, TimestampedValue<V>>> input) {
KvCoder<K, TimestampedValue<V>> kvCoder = (KvCoder<K, TimestampedValue<V>>) input.getCoder();
TimestampedValueCoder<V> tvCoder = (TimestampedValueCoder<V>) kvCoder.getValueCoder();
return input
.apply(
ParDo.of(
new DoFn<KV<K, TimestampedValue<V>>, KV<K, V>>() {
@Override
public Duration getAllowedTimestampSkew() {
return Duration.millis(Long.MAX_VALUE);
}
@ProcessElement
public void processElement(
@Element KV<K, TimestampedValue<V>> kv, OutputReceiver<KV<K, V>> r) {
r.outputWithTimestamp(
KV.of(kv.getKey(), kv.getValue().getValue()),
kv.getValue().getTimestamp());
}
}))
.setCoder(KvCoder.of(kvCoder.getKeyCoder(), tvCoder.getValueCoder()));
}
}
private Reify() {}
/**
* Create a {@link PTransform} that will output all inputs wrapped in a {@link TimestampedValue}.
*/
public static <T> PTransform<PCollection<T>, PCollection<TimestampedValue<T>>> timestamps() {
return new Timestamp<>();
}
/**
* Create a {@link PTransform} that will output all input {@link KV KVs} with the timestamp inside
* the value.
*/
public static <K, V>
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, TimestampedValue<V>>>>
timestampsInValue() {
return new TimestampInValue<>();
}
/**
* Create a {@link PTransform} that will reify information from the processing context into
* instances of {@link ValueInSingleWindow}.
*
* @param <T> element type
*/
public static <T> PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> windows() {
return new Window<>();
}
/**
* Create a {@link PTransform} that will output all input {@link KV KVs} with the window pane info
* inside the value.
*/
public static <K, V>
PTransform<PCollection<KV<K, V>>, PCollection<KV<K, ValueInSingleWindow<V>>>>
windowsInValue() {
return new WindowInValue<>();
}
/** Extracts the timestamps from each value in a {@link KV}. */
public static <K, V>
PTransform<PCollection<KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>>
extractTimestampsFromValues() {
return new ExtractTimestampsFromValues<>();
}
/**
* Pairs each element in a collection with the value of a side input associated with the element's
* window.
*/
public static <K, V> PTransform<PCollection<K>, PCollection<KV<K, V>>> viewAsValues(
PCollectionView<V> view, Coder<V> coder) {
return new ReifyView<>(view, coder);
}
/**
* Returns a {@link PCollection} consisting of a single element, containing the value of the given
* view in the global window.
*/
public static <K, V> PTransform<PBegin, PCollection<V>> viewInGlobalWindow(
PCollectionView<V> view, Coder<V> coder) {
return new ReifyViewInGlobalWindow<>(view, coder);
}
}