[NEMO-456] Remove unnecessary shuffling processes in Combine transformation (#302)
JIRA: [NEMO-456: Remove unnecessary shuffling processes in Combine transformation](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-456)
**Major changes:**
- Removed the unnecessary shuffling process before the partial combine transform
**Tests for the changes:**
- Modified GBKTransformTest.
Closes #302
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
index 2d1b90b..28aafb2 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
@@ -275,7 +275,9 @@
if (srcTransform instanceof FlattenTransform) {
return CommunicationPatternProperty.Value.ONE_TO_ONE;
}
- if (dstTransform instanceof GBKTransform
+ // If GBKTransform represents a partial CombinePerKey transformation, we do NOT need to shuffle its input,
+ // since its output will be shuffled before going through a final CombinePerKey transformation.
+ if ((dstTransform instanceof GBKTransform && !((GBKTransform) dstTransform).getIsPartialCombining())
|| dstTransform instanceof GroupByKeyTransform) {
return CommunicationPatternProperty.Value.SHUFFLE;
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 1e429c3..21ded1c 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -384,8 +384,8 @@
// Batch processing, using CombinePartialTransform and CombineFinalTransform
partialCombine = new OperatorVertex(new CombineFnPartialTransform<>(combineFn));
finalCombine = new OperatorVertex(new CombineFnFinalTransform<>(combineFn));
- // Stream data processing, using GBKTransform
} else {
+ // Stream data processing, using GBKTransform
final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
final CombineFnBase.GlobalCombineFn partialCombineFn = new PartialCombineFn(
(Combine.CombineFn) combineFn, accumulatorCoder);
@@ -414,7 +414,8 @@
ctx.getPipelineOptions(),
partialSystemReduceFn,
DoFnSchemaInformation.create(),
- DisplayData.from(beamNode.getTransform()));
+ DisplayData.from(beamNode.getTransform()),
+ true);
final GBKTransform finalCombineStreamTransform =
new GBKTransform(
@@ -424,7 +425,8 @@
ctx.getPipelineOptions(),
finalSystemReduceFn,
DoFnSchemaInformation.create(),
- DisplayData.from(beamNode.getTransform()));
+ DisplayData.from(beamNode.getTransform()),
+ false);
partialCombine = new OperatorVertex(partialCombineStreamTransform);
finalCombine = new OperatorVertex(finalCombineStreamTransform);
@@ -568,7 +570,8 @@
ctx.getPipelineOptions(),
SystemReduceFn.buffering(mainInput.getCoder()),
DoFnSchemaInformation.create(),
- DisplayData.from(beamNode.getTransform()));
+ DisplayData.from(beamNode.getTransform()),
+ false);
}
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
index 37952ea..9dd2e5a 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
@@ -39,8 +39,8 @@
import java.util.*;
/**
- * This transform performs GroupByKey or CombinePerKey operation when input data is unbounded or is not in
- * global window.
+ * This transform executes GroupByKey transformation and CombinePerKey transformation when input data is unbounded
+ * or is not in a global window.
* @param <K> key type
* @param <InputT> input type
* @param <OutputT> output type
@@ -56,6 +56,7 @@
private Watermark inputWatermark = new Watermark(Long.MIN_VALUE);
private boolean dataReceived = false;
private transient OutputCollector originOc;
+ private final boolean isPartialCombining;
public GBKTransform(final Map<TupleTag<?>, Coder<?>> outputCoders,
final TupleTag<KV<K, OutputT>> mainOutputTag,
@@ -63,7 +64,8 @@
final PipelineOptions options,
final SystemReduceFn reduceFn,
final DoFnSchemaInformation doFnSchemaInformation,
- final DisplayData displayData) {
+ final DisplayData displayData,
+ final boolean isPartialCombining) {
super(null,
null,
outputCoders,
@@ -76,6 +78,7 @@
doFnSchemaInformation,
Collections.emptyMap()); /* does not have side inputs */
this.reduceFn = reduceFn;
+ this.isPartialCombining = isPartialCombining;
}
/**
@@ -122,19 +125,19 @@
*/
@Override
public void onData(final WindowedValue<KV<K, InputT>> element) {
- dataReceived = true;
- try {
- checkAndInvokeBundle();
- final KV<K, InputT> kv = element.getValue();
- final KeyedWorkItem<K, InputT> keyedWorkItem =
- KeyedWorkItems.elementsWorkItem(kv.getKey(),
- Collections.singletonList(element.withValue(kv.getValue())));
- getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
- checkAndFinishBundle();
- } catch (final Exception e) {
- e.printStackTrace();
- throw new RuntimeException("Exception triggered element " + element.toString());
- }
+ dataReceived = true;
+ try {
+ checkAndInvokeBundle();
+ final KV<K, InputT> kv = element.getValue();
+ final KeyedWorkItem<K, InputT> keyedWorkItem =
+ KeyedWorkItems.elementsWorkItem(kv.getKey(),
+ Collections.singletonList(element.withValue(kv.getValue())));
+ getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+ checkAndFinishBundle();
+ } catch (final Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("Exception triggered element " + element.toString());
+ }
}
/**
@@ -183,8 +186,8 @@
* @param watermark watermark
*/
private void triggerTimers(final Instant processingTime,
- final Instant synchronizedTime,
- final Watermark watermark) {
+ final Instant synchronizedTime,
+ final Watermark watermark) {
final Iterator<Map.Entry<K, InMemoryTimerInternals>> iter =
inMemoryTimerInternalsFactory.getTimerInternalsMap().entrySet().iterator();
while (iter.hasNext()) {
@@ -259,6 +262,12 @@
}
}
+ /** Accessor for isPartialCombining. */
+ public boolean getIsPartialCombining() {
+ return isPartialCombining;
+ }
+
+
/** Wrapper class for {@link OutputCollector}. */
public class GBKOutputCollector implements OutputCollector<WindowedValue<KV<K, OutputT>>> {
private final OutputCollector<WindowedValue<KV<K, OutputT>>> oc;
@@ -278,9 +287,9 @@
(InMemoryTimerInternals) inMemoryTimerInternalsFactory.timerInternalsForKey(key);
// Add the output timestamp to the watermark hold of each key.
// +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999.
- keyOutputWatermarkMap.put(key,
- new Watermark(output.getTimestamp().getMillis() + 1));
- timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1));
+ keyOutputWatermarkMap.put(key,
+ new Watermark(output.getTimestamp().getMillis() + 1));
+ timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1));
}
oc.emit(output);
}
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
index 9a98932..45933b0 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
@@ -161,7 +161,8 @@
PipelineOptionsFactory.as(NemoPipelineOptions.class),
SystemReduceFn.combining(STRING_CODER, applied_combine_fn),
DoFnSchemaInformation.create(),
- DisplayData.none());
+ DisplayData.none(),
+ false);
// window1 : [-5000, 5000) in millisecond
// window2 : [0, 10000)
@@ -288,7 +289,8 @@
PipelineOptionsFactory.as(NemoPipelineOptions.class),
SystemReduceFn.combining(STRING_CODER, applied_combine_fn),
DoFnSchemaInformation.create(),
- DisplayData.none());
+ DisplayData.none(),
+ false);
// window1 : [-5000, 5000) in millisecond
// window2 : [0, 10000)
@@ -381,7 +383,8 @@
PipelineOptionsFactory.as(NemoPipelineOptions.class),
SystemReduceFn.buffering(STRING_CODER),
DoFnSchemaInformation.create(),
- DisplayData.none());
+ DisplayData.none(),
+ false);
final Instant ts1 = new Instant(1);
final Instant ts2 = new Instant(100);
@@ -567,7 +570,8 @@
PipelineOptionsFactory.as(NemoPipelineOptions.class),
SystemReduceFn.buffering(STRING_CODER),
DoFnSchemaInformation.create(),
- DisplayData.none());
+ DisplayData.none(),
+ false);
final Transform.Context context = mock(Transform.Context.class);