[NEMO-456] Remove unnecessary shuffling processes in Combine transformation (#302)

JIRA: [NEMO-456: Remove unnecessary shuffling processes in Combine transformation](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-456)

**Major changes:**
- Removed the unnecessary shuffling process before the partial combine transform

**Tests for the changes:**
- Modified GBKTransformTest.

Closes #302
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
index 2d1b90b..28aafb2 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
@@ -275,7 +275,9 @@
     if (srcTransform instanceof FlattenTransform) {
       return CommunicationPatternProperty.Value.ONE_TO_ONE;
     }
-    if (dstTransform instanceof GBKTransform
+    // If GBKTransform represents a partial CombinePerKey transformation, we do NOT need to shuffle its input,
+    // since its output will be shuffled before going through a final CombinePerKey transformation.
+    if ((dstTransform instanceof GBKTransform && !((GBKTransform) dstTransform).getIsPartialCombining())
       || dstTransform instanceof GroupByKeyTransform) {
       return CommunicationPatternProperty.Value.SHUFFLE;
     }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 1e429c3..21ded1c 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -384,8 +384,8 @@
       // Batch processing, using CombinePartialTransform and CombineFinalTransform
       partialCombine = new OperatorVertex(new CombineFnPartialTransform<>(combineFn));
       finalCombine = new OperatorVertex(new CombineFnFinalTransform<>(combineFn));
-      // Stream data processing, using GBKTransform
     } else {
+      // Stream data processing, using GBKTransform
       final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
       final CombineFnBase.GlobalCombineFn partialCombineFn = new PartialCombineFn(
         (Combine.CombineFn) combineFn, accumulatorCoder);
@@ -414,7 +414,8 @@
           ctx.getPipelineOptions(),
           partialSystemReduceFn,
           DoFnSchemaInformation.create(),
-          DisplayData.from(beamNode.getTransform()));
+          DisplayData.from(beamNode.getTransform()),
+          true);
 
       final GBKTransform finalCombineStreamTransform =
         new GBKTransform(
@@ -424,7 +425,8 @@
           ctx.getPipelineOptions(),
           finalSystemReduceFn,
           DoFnSchemaInformation.create(),
-          DisplayData.from(beamNode.getTransform()));
+          DisplayData.from(beamNode.getTransform()),
+          false);
 
       partialCombine = new OperatorVertex(partialCombineStreamTransform);
       finalCombine = new OperatorVertex(finalCombineStreamTransform);
@@ -568,7 +570,8 @@
         ctx.getPipelineOptions(),
         SystemReduceFn.buffering(mainInput.getCoder()),
         DoFnSchemaInformation.create(),
-        DisplayData.from(beamNode.getTransform()));
+        DisplayData.from(beamNode.getTransform()),
+        false);
     }
   }
 
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
index 37952ea..9dd2e5a 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
@@ -39,8 +39,8 @@
 import java.util.*;
 
 /**
- * This transform performs GroupByKey or CombinePerKey operation when input data is unbounded or is not in
- * global window.
+ * This transform executes GroupByKey transformation and CombinePerKey transformation when input data is unbounded
+ * or is not in a global window.
  * @param <K> key type
  * @param <InputT> input type
  * @param <OutputT> output type
@@ -56,6 +56,7 @@
   private Watermark inputWatermark = new Watermark(Long.MIN_VALUE);
   private boolean dataReceived = false;
   private transient OutputCollector originOc;
+  private final boolean isPartialCombining;
 
   public GBKTransform(final Map<TupleTag<?>, Coder<?>> outputCoders,
                       final TupleTag<KV<K, OutputT>> mainOutputTag,
@@ -63,7 +64,8 @@
                       final PipelineOptions options,
                       final SystemReduceFn reduceFn,
                       final DoFnSchemaInformation doFnSchemaInformation,
-                      final DisplayData displayData) {
+                      final DisplayData displayData,
+                      final boolean isPartialCombining) {
     super(null,
       null,
       outputCoders,
@@ -76,6 +78,7 @@
       doFnSchemaInformation,
       Collections.emptyMap()); /* does not have side inputs */
     this.reduceFn = reduceFn;
+    this.isPartialCombining = isPartialCombining;
   }
 
   /**
@@ -122,19 +125,19 @@
    */
   @Override
   public void onData(final WindowedValue<KV<K, InputT>> element) {
-      dataReceived = true;
-      try {
-        checkAndInvokeBundle();
-        final KV<K, InputT> kv = element.getValue();
-        final KeyedWorkItem<K, InputT> keyedWorkItem =
-          KeyedWorkItems.elementsWorkItem(kv.getKey(),
-            Collections.singletonList(element.withValue(kv.getValue())));
-        getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
-        checkAndFinishBundle();
-      } catch (final Exception e) {
-        e.printStackTrace();
-        throw new RuntimeException("Exception triggered element " + element.toString());
-      }
+    dataReceived = true;
+    try {
+      checkAndInvokeBundle();
+      final KV<K, InputT> kv = element.getValue();
+      final KeyedWorkItem<K, InputT> keyedWorkItem =
+        KeyedWorkItems.elementsWorkItem(kv.getKey(),
+          Collections.singletonList(element.withValue(kv.getValue())));
+      getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+      checkAndFinishBundle();
+    } catch (final Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException("Exception triggered element " + element.toString());
+    }
   }
 
   /**
@@ -183,8 +186,8 @@
    * @param watermark watermark
    */
   private void triggerTimers(final Instant processingTime,
-                            final Instant synchronizedTime,
-                            final Watermark watermark) {
+                             final Instant synchronizedTime,
+                             final Watermark watermark) {
     final Iterator<Map.Entry<K, InMemoryTimerInternals>> iter =
       inMemoryTimerInternalsFactory.getTimerInternalsMap().entrySet().iterator();
     while (iter.hasNext()) {
@@ -259,6 +262,12 @@
     }
   }
 
+  /** Accessor for isPartialCombining. */
+  public boolean getIsPartialCombining() {
+    return isPartialCombining;
+  }
+
+
   /** Wrapper class for {@link OutputCollector}. */
   public class GBKOutputCollector implements OutputCollector<WindowedValue<KV<K, OutputT>>> {
     private final OutputCollector<WindowedValue<KV<K, OutputT>>> oc;
@@ -278,9 +287,9 @@
           (InMemoryTimerInternals) inMemoryTimerInternalsFactory.timerInternalsForKey(key);
         // Add the output timestamp to the watermark hold of each key.
         // +1 to the output timestamp because if the window is [0-5000), the timestamp is 4999.
-          keyOutputWatermarkMap.put(key,
-            new Watermark(output.getTimestamp().getMillis() + 1));
-          timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1));
+        keyOutputWatermarkMap.put(key,
+          new Watermark(output.getTimestamp().getMillis() + 1));
+        timerInternals.advanceOutputWatermark(new Instant(output.getTimestamp().getMillis() + 1));
       }
       oc.emit(output);
     }
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
index 9a98932..45933b0 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java
@@ -161,7 +161,8 @@
         PipelineOptionsFactory.as(NemoPipelineOptions.class),
         SystemReduceFn.combining(STRING_CODER, applied_combine_fn),
         DoFnSchemaInformation.create(),
-        DisplayData.none());
+        DisplayData.none(),
+        false);
 
     // window1 : [-5000, 5000) in millisecond
     // window2 : [0, 10000)
@@ -288,7 +289,8 @@
         PipelineOptionsFactory.as(NemoPipelineOptions.class),
         SystemReduceFn.combining(STRING_CODER, applied_combine_fn),
         DoFnSchemaInformation.create(),
-        DisplayData.none());
+        DisplayData.none(),
+        false);
 
     // window1 : [-5000, 5000) in millisecond
     // window2 : [0, 10000)
@@ -381,7 +383,8 @@
         PipelineOptionsFactory.as(NemoPipelineOptions.class),
         SystemReduceFn.buffering(STRING_CODER),
         DoFnSchemaInformation.create(),
-        DisplayData.none());
+        DisplayData.none(),
+        false);
 
     final Instant ts1 = new Instant(1);
     final Instant ts2 = new Instant(100);
@@ -567,7 +570,8 @@
         PipelineOptionsFactory.as(NemoPipelineOptions.class),
         SystemReduceFn.buffering(STRING_CODER),
         DoFnSchemaInformation.create(),
-        DisplayData.none());
+        DisplayData.none(),
+        false);
 
 
     final Transform.Context context = mock(Transform.Context.class);