[NEMO-429] SWPP TEAM 21 code smell fix (#258)

JIRA: [NEMO-429: SWPP TEAM 21 code smell fix](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-429)

**Major changes:**
- Fix sonarcloud code smells
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java
index a7cb634..cddbb53 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java
@@ -35,11 +35,11 @@
 
   @Override
   public IRDAG apply(final IRDAG dag) {
-    dag.getVertices().forEach(vertex -> {
+    dag.getVertices().forEach(vertex ->
       dag.getIncomingEdgesOf(vertex).stream()
         .forEach(edge -> edge.setPropertyPermanently(
-          DataStoreProperty.of(DataStoreProperty.Value.PIPE)));
-    });
+          DataStoreProperty.of(DataStoreProperty.Value.PIPE)))
+    );
     return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewAnnotatingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewAnnotatingPass.java
index f328a84..b87ca83 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewAnnotatingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewAnnotatingPass.java
@@ -54,7 +54,7 @@
 
   @Override
   public IRDAG apply(final IRDAG dag) {
-    dag.topologicalDo(v -> {
+    dag.topologicalDo(v ->
       dag.getIncomingEdgesOf(v).forEach(e -> {
         if (CommunicationPatternProperty.Value.SHUFFLE
           .equals(e.getPropertyValue(CommunicationPatternProperty.class).get())
@@ -67,8 +67,8 @@
           e.setPropertyPermanently(PartitionerProperty.of(
             PartitionerProperty.Type.HASH, dstParallelism * HASH_RANGE_MULTIPLIER));
         }
-      });
-    });
+      })
+    );
     return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
index d84426f..3a32e5d 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/UpfrontCloningPass.java
@@ -45,7 +45,7 @@
         // only shuffle receivers (for now... as particular Beam sink operators fail when cloned)
         .anyMatch(edge ->
           edge.getPropertyValue(CommunicationPatternProperty.class)
-            .orElseThrow(() -> new IllegalStateException())
+            .orElseThrow(IllegalStateException::new)
             .equals(CommunicationPatternProperty.Value.SHUFFLE))
       )
       .forEach(vertex -> vertex.setProperty(
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java
index 01a991e..91759bb 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleReshapingPass.java
@@ -39,14 +39,14 @@
 
   @Override
   public IRDAG apply(final IRDAG dag) {
-    dag.topologicalDo(vertex -> {
+    dag.topologicalDo(vertex ->
       dag.getIncomingEdgesOf(vertex).forEach(edge -> {
         if (CommunicationPatternProperty.Value.SHUFFLE
           .equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
           dag.insert(new RelayVertex(), edge);
         }
-      });
-    });
+      })
+    );
     return dag;
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/Block.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/Block.java
index 83c4d95..61ceece 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/Block.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/Block.java
@@ -19,8 +19,6 @@
 package org.apache.nemo.runtime.executor.data.block;
 
 import org.apache.nemo.common.KeyRange;
-import org.apache.nemo.common.exception.BlockFetchException;
-import org.apache.nemo.common.exception.BlockWriteException;
 import org.apache.nemo.runtime.executor.data.partition.NonSerializedPartition;
 import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
 
@@ -42,12 +40,14 @@
    *
    * @param key     the key.
    * @param element the element to write.
-   * @throws BlockWriteException for any error occurred while trying to write a block.
-   *                             (This exception will be thrown to the scheduler
-   *                             through {@link org.apache.nemo.runtime.executor.Executor} and
-   *                             have to be handled by the scheduler with fault tolerance mechanism.)
+   *
+   * Classes implementing this interface may throw
+   * org.apache.nemo.common.exception.BlockWriteException for any error occurred while trying to write a block.
+   * (This exception will be thrown to the scheduler
+   * through {@link org.apache.nemo.runtime.executor.Executor} and
+   * have to be handled by the scheduler with fault tolerance mechanism.)
    */
-  void write(K key, Object element) throws BlockWriteException;
+  void write(K key, Object element);
 
   /**
    * Stores {@link NonSerializedPartition}s to this block.
@@ -55,12 +55,14 @@
    * Invariant: This method does not support concurrent write.
    *
    * @param partitions the {@link NonSerializedPartition}s to store.
-   * @throws BlockWriteException for any error occurred while trying to write a block.
-   *                             (This exception will be thrown to the scheduler
-   *                             through {@link org.apache.nemo.runtime.executor.Executor} and
-   *                             have to be handled by the scheduler with fault tolerance mechanism.)
+   *
+   * Classes implementing this interface may throw
+   * org.apache.nemo.common.exception.BlockWriteException for any error occurred while trying to write a block.
+   * (This exception will be thrown to the scheduler
+   * through {@link org.apache.nemo.runtime.executor.Executor} and
+   * have to be handled by the scheduler with fault tolerance mechanism.)
    */
-  void writePartitions(Iterable<NonSerializedPartition<K>> partitions) throws BlockWriteException;
+  void writePartitions(Iterable<NonSerializedPartition<K>> partitions);
 
   /**
    * Stores {@link SerializedPartition}s to this block.
@@ -68,12 +70,14 @@
    * Invariant: This method does not support concurrent write.
    *
    * @param partitions the {@link SerializedPartition}s to store.
-   * @throws BlockWriteException for any error occurred while trying to write a block.
-   *                             (This exception will be thrown to the scheduler
-   *                             through {@link org.apache.nemo.runtime.executor.Executor} and
-   *                             have to be handled by the scheduler with fault tolerance mechanism.)
+   *
+   * Classes implementing this interface may throw
+   * org.apache.nemo.common.exception.BlockWriteException for any error occurred while trying to write a block.
+   * (This exception will be thrown to the scheduler
+   * through {@link org.apache.nemo.runtime.executor.Executor} and
+   * have to be handled by the scheduler with fault tolerance mechanism.)
    */
-  void writeSerializedPartitions(Iterable<SerializedPartition<K>> partitions) throws BlockWriteException;
+  void writeSerializedPartitions(Iterable<SerializedPartition<K>> partitions);
 
   /**
    * Retrieves the {@link NonSerializedPartition}s in a specific key range from this block.
@@ -82,12 +86,14 @@
    *
    * @param keyRange the key range to retrieve.
    * @return an iterable of {@link NonSerializedPartition}s.
-   * @throws BlockFetchException for any error occurred while trying to fetch a block.
-   *                             (This exception will be thrown to the scheduler
-   *                             through {@link org.apache.nemo.runtime.executor.Executor} and
-   *                             have to be handled by the scheduler with fault tolerance mechanism.)
+   *
+   * Classes implementing this interface may throw
+   * org.apache.nemo.common.exception.BlockFetchException for any error occurred while trying to fetch a block.
+   * (This exception will be thrown to the scheduler
+   * through {@link org.apache.nemo.runtime.executor.Executor} and
+   * have to be handled by the scheduler with fault tolerance mechanism.)
    */
-  Iterable<NonSerializedPartition<K>> readPartitions(KeyRange<K> keyRange) throws BlockFetchException;
+  Iterable<NonSerializedPartition<K>> readPartitions(KeyRange<K> keyRange);
 
   /**
    * Retrieves the {@link SerializedPartition}s in a specific key range.
@@ -95,23 +101,27 @@
    *
    * @param keyRange the hash range to retrieve.
    * @return an iterable of {@link SerializedPartition}s.
-   * @throws BlockFetchException for any error occurred while trying to fetch a block.
-   *                             (This exception will be thrown to the scheduler
-   *                             through {@link org.apache.nemo.runtime.executor.Executor} and
-   *                             have to be handled by the scheduler with fault tolerance mechanism.)
+   *
+   * Classes implementing this interface may throw
+   * org.apache.nemo.common.exception.BlockFetchException for any error occurred while trying to fetch a block.
+   * (This exception will be thrown to the scheduler
+   * through {@link org.apache.nemo.runtime.executor.Executor} and
+   * have to be handled by the scheduler with fault tolerance mechanism.)
    */
-  Iterable<SerializedPartition<K>> readSerializedPartitions(KeyRange<K> keyRange) throws BlockFetchException;
+  Iterable<SerializedPartition<K>> readSerializedPartitions(KeyRange<K> keyRange);
 
   /**
    * Commits this block to prevent further write.
    *
    * @return the size of each partition if the data in the block is serialized.
-   * @throws BlockWriteException for any error occurred while trying to commit a block.
-   *                             (This exception will be thrown to the scheduler
-   *                             through {@link org.apache.nemo.runtime.executor.Executor} and
-   *                             have to be handled by the scheduler with fault tolerance mechanism.)
+   *
+   * Classes implementing this interface may throw
+   * org.apache.nemo.common.exception.BlockWriteException for any error occurred while trying to commit a block.
+   * (This exception will be thrown to the scheduler
+   * through {@link org.apache.nemo.runtime.executor.Executor} and
+   * have to be handled by the scheduler with fault tolerance mechanism.)
    */
-  Optional<Map<K, Long>> commit() throws BlockWriteException;
+  Optional<Map<K, Long>> commit();
 
   /**
    * Commits all un-committed partitions.
@@ -120,12 +130,14 @@
    * If another element is written after this method is called, a new non-committed partition should be created
    * for the element even if a partition with the same key is committed already.
    *
-   * @throws BlockWriteException for any error occurred while trying to commit partitions.
-   *                             (This exception will be thrown to the scheduler
-   *                             through {@link org.apache.nemo.runtime.executor.Executor} and
-   *                             have to be handled by the scheduler with fault tolerance mechanism.)
+   * Classes implementing this interface may throw
+   * org.apache.nemo.common.exception.BlockWriteException for any error occurred
+   * while trying to commit partitions.
+   * (This exception will be thrown to the scheduler
+   * through {@link org.apache.nemo.runtime.executor.Executor} and
+   * have to be handled by the scheduler with fault tolerance mechanism.)
    */
-  void commitPartitions() throws BlockWriteException;
+  void commitPartitions();
 
   /**
    * @return the ID of this block.
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 9aaf455..37d4f0b 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -206,7 +206,7 @@
       // This should not be reached.
       fail();
     } catch (NullPointerException e) {
-      assertEquals(true, true);
+      assertTrue(true);
     }
   }