blob: cf75cb469536439b5e30cc496560f035d8c83a81 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.compiler.frontend.beam.transform;
import org.apache.beam.runners.core.*;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.punctuation.Watermark;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* Groups elements according to key and window.
*
* @param <K> key type.
* @param <InputT> input type.
*/
public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>, KV<K, Iterable<InputT>>> {
private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransform.class.getName());
private final SystemReduceFn reduceFn;
private final Map<K, List<WindowedValue<InputT>>> keyToValues;
private transient InMemoryTimerInternalsFactory inMemoryTimerInternalsFactory;
private transient InMemoryStateInternalsFactory inMemoryStateInternalsFactory;
private Watermark prevOutputWatermark;
private final Map<K, Watermark> keyAndWatermarkHoldMap;
/**
* GroupByKey constructor.
*
* @param outputCoders output coders
* @param mainOutputTag main output tag
* @param windowingStrategy windowing strategy
* @param options pipeline options
* @param reduceFn reduce function
* @param displayData display data.
*/
public GroupByKeyAndWindowDoFnTransform(final Map<TupleTag<?>, Coder<?>> outputCoders,
final TupleTag<KV<K, Iterable<InputT>>> mainOutputTag,
final WindowingStrategy<?, ?> windowingStrategy,
final PipelineOptions options,
final SystemReduceFn reduceFn,
final DisplayData displayData) {
super(null, /* doFn */
null, /* inputCoder */
outputCoders,
mainOutputTag,
Collections.emptyList(), /* GBK does not have additional outputs */
windowingStrategy,
Collections.emptyMap(), /* GBK does not have additional side inputs */
options,
displayData);
this.keyToValues = new HashMap<>();
this.reduceFn = reduceFn;
this.prevOutputWatermark = new Watermark(Long.MIN_VALUE);
this.keyAndWatermarkHoldMap = new HashMap<>();
}
/**
* This creates a new DoFn that groups elements by key and window.
*
* @param doFn original doFn.
* @return GroupAlsoByWindowViaWindowSetNewDoFn
*/
@Override
protected DoFn wrapDoFn(final DoFn doFn) {
final Map<K, StateAndTimerForKey> map = new HashMap<>();
this.inMemoryStateInternalsFactory = new InMemoryStateInternalsFactory(map);
this.inMemoryTimerInternalsFactory = new InMemoryTimerInternalsFactory(map);
// This function performs group by key and window operation
return
GroupAlsoByWindowViaWindowSetNewDoFn.create(
getWindowingStrategy(),
inMemoryStateInternalsFactory,
inMemoryTimerInternalsFactory,
null, // GBK has no sideinput.
reduceFn,
getOutputManager(),
getMainOutputTag());
}
@Override
OutputCollector wrapOutputCollector(final OutputCollector oc) {
return new GBKWOutputCollector(oc);
}
/**
* It collects data for each key.
* The collected data are emitted at {@link GroupByKeyAndWindowDoFnTransform#onWatermark(Watermark)}
*
* @param element data element
*/
@Override
public void onData(final WindowedValue<KV<K, InputT>> element) {
checkAndInvokeBundle();
// We can call Beam's DoFnRunner#processElement here,
// but it may generate some overheads if we call the method for each data.
// The `processElement` requires a `Iterator` of data, so we emit the buffered data every watermark.
// TODO #250: But, this approach can delay the event processing in streaming,
// TODO #250: if the watermark is not triggered for a long time.
final KV<K, InputT> kv = element.getValue();
keyToValues.putIfAbsent(kv.getKey(), new ArrayList<>());
keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
checkAndFinishBundle();
}
/**
* Process the collected data and trigger timers.
*
* @param inputWatermark current input watermark
* @param processingTime processing time
* @param synchronizedTime synchronized time
*/
private void processElementsAndTriggerTimers(final Watermark inputWatermark,
final Instant processingTime,
final Instant synchronizedTime) {
for (final Map.Entry<K, List<WindowedValue<InputT>>> entry : keyToValues.entrySet()) {
final K key = entry.getKey();
final List<WindowedValue<InputT>> values = entry.getValue();
// for each key
// Process elements
if (!values.isEmpty()) {
final KeyedWorkItem<K, InputT> keyedWorkItem =
KeyedWorkItems.elementsWorkItem(key, values);
// The DoFnRunner interface requires WindowedValue,
// but this windowed value is actually not used in the ReduceFnRunner internal.
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
}
// Trigger timers
triggerTimers(key, inputWatermark, processingTime, synchronizedTime);
// Remove values
values.clear();
}
}
/**
* Output watermark
* = max(prev output watermark,
* min(input watermark, watermark holds)).
*
* @param inputWatermark input watermark
*/
private void emitOutputWatermark(final Watermark inputWatermark) {
// Find min watermark hold
final Watermark minWatermarkHold = keyAndWatermarkHoldMap.isEmpty()
? new Watermark(Long.MAX_VALUE) // set this to MAX, in order to just use the input watermark.
: Collections.min(keyAndWatermarkHoldMap.values());
final Watermark outputWatermarkCandidate = new Watermark(
Math.max(prevOutputWatermark.getTimestamp(),
Math.min(minWatermarkHold.getTimestamp(), inputWatermark.getTimestamp())));
if (LOG.isDebugEnabled()) {
LOG.debug("Watermark hold: {}, "
+ "inputWatermark: {}, outputWatermark: {}", minWatermarkHold, inputWatermark, prevOutputWatermark);
}
if (outputWatermarkCandidate.getTimestamp() > prevOutputWatermark.getTimestamp()) {
// progress!
prevOutputWatermark = outputWatermarkCandidate;
// emit watermark
getOutputCollector().emitWatermark(outputWatermarkCandidate);
// Remove minimum watermark holds
if (minWatermarkHold.getTimestamp() == outputWatermarkCandidate.getTimestamp()) {
keyAndWatermarkHoldMap.entrySet()
.removeIf(entry -> entry.getValue().getTimestamp() == minWatermarkHold.getTimestamp());
}
}
}
@Override
public void onWatermark(final Watermark inputWatermark) {
checkAndInvokeBundle();
processElementsAndTriggerTimers(inputWatermark, Instant.now(), Instant.now());
// Emit watermark to downstream operators
emitOutputWatermark(inputWatermark);
checkAndFinishBundle();
}
/**
* This advances the input watermark and processing time to the timestamp max value
* in order to emit all data.
*/
@Override
protected void beforeClose() {
// Finish any pending windows by advancing the input watermark to infinity.
processElementsAndTriggerTimers(new Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
}
/**
* Trigger times for current key.
* When triggering, it emits the windowed data to downstream operators.
*
* @param key key
* @param watermark watermark
* @param processingTime processing time
* @param synchronizedTime synchronized time
*/
private void triggerTimers(final K key,
final Watermark watermark,
final Instant processingTime,
final Instant synchronizedTime) {
final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
inMemoryTimerInternalsFactory.timerInternalsForKey(key);
try {
timerInternals.advanceInputWatermark(new Instant(watermark.getTimestamp()));
timerInternals.advanceProcessingTime(processingTime);
timerInternals.advanceSynchronizedProcessingTime(synchronizedTime);
} catch (final Exception e) {
throw new RuntimeException(e);
}
final List<TimerInternals.TimerData> timerDataList = getEligibleTimers(timerInternals);
if (!timerDataList.isEmpty()) {
// Trigger timers and emit windowed data
final KeyedWorkItem<K, InputT> timerWorkItem =
KeyedWorkItems.timersWorkItem(key, timerDataList);
// The DoFnRunner interface requires WindowedValue,
// but this windowed value is actually not used in the ReduceFnRunner internal.
getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(timerWorkItem));
}
}
/**
* Get timer data.
*
* @param timerInternals in-memory timer internals.
* @return list of timer datas.
*/
private List<TimerInternals.TimerData> getEligibleTimers(final InMemoryTimerInternals timerInternals) {
final List<TimerInternals.TimerData> timerData = new LinkedList<>();
while (true) {
TimerInternals.TimerData timer;
boolean hasFired = false;
while ((timer = timerInternals.removeNextEventTimer()) != null) {
hasFired = true;
timerData.add(timer);
}
while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
hasFired = true;
timerData.add(timer);
}
while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
hasFired = true;
timerData.add(timer);
}
if (!hasFired) {
break;
}
}
return timerData;
}
/**
* State and timer internal.
*/
final class StateAndTimerForKey {
private StateInternals stateInternals;
private TimerInternals timerInternals;
/**
* @param stateInternals state internals.
* @param timerInternals timer internals.
*/
StateAndTimerForKey(final StateInternals stateInternals,
final TimerInternals timerInternals) {
this.stateInternals = stateInternals;
this.timerInternals = timerInternals;
}
}
/**
* InMemoryStateInternalsFactory.
*/
final class InMemoryStateInternalsFactory implements StateInternalsFactory<K> {
private final Map<K, StateAndTimerForKey> map;
/**
* @param map initial map.
*/
InMemoryStateInternalsFactory(final Map<K, StateAndTimerForKey> map) {
this.map = map;
}
@Override
public StateInternals stateInternalsForKey(final K key) {
map.putIfAbsent(key, new StateAndTimerForKey(InMemoryStateInternals.forKey(key), null));
final StateAndTimerForKey stateAndTimerForKey = map.get(key);
if (stateAndTimerForKey.stateInternals == null) {
stateAndTimerForKey.stateInternals = InMemoryStateInternals.forKey(key);
}
return stateAndTimerForKey.stateInternals;
}
}
/**
* InMemoryTimerInternalsFactory.
*/
final class InMemoryTimerInternalsFactory implements TimerInternalsFactory<K> {
private final Map<K, StateAndTimerForKey> map;
/**
* @param map initial map.
*/
InMemoryTimerInternalsFactory(final Map<K, StateAndTimerForKey> map) {
this.map = map;
}
@Override
public TimerInternals timerInternalsForKey(final K key) {
map.putIfAbsent(key, new StateAndTimerForKey(null, new InMemoryTimerInternals()));
final StateAndTimerForKey stateAndTimerForKey = map.get(key);
if (stateAndTimerForKey.timerInternals == null) {
stateAndTimerForKey.timerInternals = new InMemoryTimerInternals();
}
return stateAndTimerForKey.timerInternals;
}
}
/**
* This class wraps the output collector to track the watermark hold of each key.
*/
final class GBKWOutputCollector implements OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> {
private final OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> outputCollector;
/**
* @param outputCollector output collector.
*/
GBKWOutputCollector(final OutputCollector<WindowedValue<KV<K, Iterable<InputT>>>> outputCollector) {
this.outputCollector = outputCollector;
}
@Override
public void emit(final WindowedValue<KV<K, Iterable<InputT>>> output) {
// The watermark advances only in ON_TIME
if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) {
final K key = output.getValue().getKey();
final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
inMemoryTimerInternalsFactory.timerInternalsForKey(key);
keyAndWatermarkHoldMap.put(key,
// adds the output timestamp to the watermark hold of each key
// +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999
new Watermark(output.getTimestamp().getMillis() + 1));
timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1));
}
outputCollector.emit(output);
}
@Override
public void emitWatermark(final Watermark watermark) {
outputCollector.emitWatermark(watermark);
}
@Override
public <T> void emit(final String dstVertexId, final T output) {
outputCollector.emit(dstVertexId, output);
}
}
}