[NEMO-429] SWPP TEAM3 Code Smell Fix (#265)

JIRA: [NEMO-429: SWPP TEAM 3 Code Smell Fix](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-429)

**Major changes:**
- Fixed code smells(SWPP Code Smell Session) 
diff --git a/common/src/test/java/org/apache/nemo/common/DAGTest.java b/common/src/test/java/org/apache/nemo/common/DAGTest.java
index e735fef..629d525 100644
--- a/common/src/test/java/org/apache/nemo/common/DAGTest.java
+++ b/common/src/test/java/org/apache/nemo/common/DAGTest.java
@@ -66,8 +66,8 @@
     assertEquals(dag.getIncomingEdgesOf(new IntegerVertex(1)).size(), 0);
     assertEquals(dag.getOutgoingEdgesOf(new IntegerVertex(5)).size(), 0);
     assertEquals(dag.getIncomingEdgesOf(new IntegerVertex(3)).size(), 1);
-    assertEquals(dag.getOutgoingEdgesOf(new IntegerVertex(4)).size(), 1);
-    assertEquals(dag.getTopologicalSort().size(), 5);
+    assertEquals(1, dag.getOutgoingEdgesOf(new IntegerVertex(4)).size());
+    assertEquals(5, dag.getTopologicalSort().size());
 
     final List<IntegerVertex> topologicalOrder = dag.getTopologicalSort();
     assertEquals(topologicalOrder.get(0).getValue(), 1);
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
index 9a66d72..2d356c0 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
@@ -39,6 +39,7 @@
 public final class ReduceByKeyTransform<K, V> extends NoWatermarkEmitTransform<Tuple2<K, V>, Tuple2<K, V>> {
   private static final Logger LOG = LoggerFactory.getLogger(ReduceByKeyTransform.class.getName());
 
+  // TODO #431: Handle states in Transforms better
   private final Map<K, List<V>> keyToValues;
   private final Function2<V, V, V> func;
   private OutputCollector<Tuple2<K, V>> outputCollector;
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
index b7a445f..3c5e2d7 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
@@ -34,6 +34,7 @@
 public final class ReduceTransform<T> implements Transform<T, T> {
   private final Function2<T, T, T> func;
   private OutputCollector<T> outputCollector;
+  // TODO #431: Handle states in Transforms better
   private T result;
 
   /**
@@ -41,14 +42,15 @@
    *
    * @param func function to run for the reduce transform.
    */
+  // TODO #432: ReduceTransform Unit Test
   public ReduceTransform(final Function2<T, T, T> func) {
     this.func = func;
-    this.result = null;
   }
 
   @Override
   public void prepare(final Context context, final OutputCollector<T> oc) {
     this.outputCollector = oc;
+    this.result = null;
   }
 
   @Override
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java
index 00c1f52..c86b756 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java
@@ -74,6 +74,7 @@
   public static final String POSTGRESQL_METADATA_DB_NAME =
     "jdbc:postgresql://nemo-optimization.cabbufr3evny.us-west-2.rds.amazonaws.com:5432/nemo_optimization";
   private static final String METADATA_TABLE_NAME = "nemo_optimization_meta";
+  private static final String SAVING_METADATA_FAIL_MSG = "Saving of Metadata to DB failed: ";
 
   /**
    * Private constructor.
@@ -166,7 +167,7 @@
             try {
               insertOrUpdateMetadata(c, "EP_KEY_METADATA", l, r);
             } catch (SQLException e) {
-              LOG.warn("Saving of Metadata to DB failed: ", e);
+              LOG.warn(SAVING_METADATA_FAIL_MSG, e);
             }
           });
           LOG.info("EP Key Metadata saved to DB.");
@@ -177,14 +178,14 @@
             try {
               insertOrUpdateMetadata(c, "EP_METADATA", l.left() * 10000 + l.right(), r);
             } catch (SQLException e) {
-              LOG.warn("Saving of Metadata to DB failed: ", e);
+              LOG.warn(SAVING_METADATA_FAIL_MSG, e);
             }
           });
           LOG.info("EP Metadata saved to DB.");
         }
       }
     } catch (SQLException e) {
-      LOG.warn("Saving of Metadata to DB failed: ", e);
+      LOG.warn(SAVING_METADATA_FAIL_MSG, e);
     }
   }
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java
index bd87031..6ffdf5b 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java
@@ -63,7 +63,7 @@
    * @throws BlockWriteException if fail to write.
    */
   @Override
-  public void writeBlock(final Block block) throws BlockWriteException {
+  public void writeBlock(final Block block) {
     if (!(block instanceof NonSerializedMemoryBlock)) {
       throw new BlockWriteException(new Throwable(
         this.toString() + "only accept " + NonSerializedPartition.class.getName()));
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
index de46951..19b8382 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
@@ -62,7 +62,7 @@
    * @throws BlockWriteException if fail to write.
    */
   @Override
-  public void writeBlock(final Block block) throws BlockWriteException {
+  public void writeBlock(final Block block) {
     if (!(block instanceof SerializedMemoryBlock)) {
       throw new BlockWriteException(new Throwable(
         this.toString() + "only accept " + SerializedMemoryBlock.class.getName()));
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
index 1466f4e..4c3d112 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -172,7 +172,7 @@
       .map(edge -> edge.getExecutionProperties()
         .get(MessageIdEdgeProperty.class)
         .<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException(edge.getId())))
-      .findFirst().<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException());
+      .findFirst().<IllegalArgumentException>orElseThrow(IllegalArgumentException::new);
     // Type casting is needed. See: https://stackoverflow.com/a/40865318
 
     return messageIds.iterator().next();
@@ -286,7 +286,7 @@
   public void onSpeculativeExecutionCheck() {
     MutableBoolean isNewCloneCreated = new MutableBoolean(false);
 
-    selectEarliestSchedulableGroup().ifPresent(scheduleGroup -> {
+    selectEarliestSchedulableGroup().ifPresent(scheduleGroup ->
       scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
         final Stage stage = planStateManager.getPhysicalPlan().getStageDAG().getVertexById(stageId);
 
@@ -296,8 +296,8 @@
             isNewCloneCreated.setValue(doSpeculativeExecution(stage, cloneConf));
           }
         });
-      });
-    });
+      })
+    );
 
     if (isNewCloneCreated.booleanValue()) {
       doSchedule(); // Do schedule the new clone.
@@ -513,7 +513,7 @@
     final Stage stagePutOnHold = stageDag.getVertices().stream()
       .filter(stage -> stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId)))
       .findFirst()
-      .orElseThrow(() -> new RuntimeException());
+      .orElseThrow(RuntimeException::new);
 
     // Stage put on hold, i.e. stage with vertex containing MessageAggregatorTransform
     // should have a parent stage whose outgoing edges contain the target edge of dynamic optimization.