[NEMO-429] SWPP TEAM19 Code Smell Fix (#269)

JIRA: [NEMO-429: TEAM19](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-429)

**Major changes:**
- Fixed code smells (SWPP Code Smell session)

**Minor changes to note:**
- As TA suggested, we add ```TODO #430: Pair elements should be serializable``` at Pair.java
diff --git a/common/src/main/java/org/apache/nemo/common/Pair.java b/common/src/main/java/org/apache/nemo/common/Pair.java
index d14203f..b8189ad 100644
--- a/common/src/main/java/org/apache/nemo/common/Pair.java
+++ b/common/src/main/java/org/apache/nemo/common/Pair.java
@@ -28,6 +28,7 @@
  * @param <B> type of the right element.
  */
 public final class Pair<A, B> implements Serializable {
+  // TODO #430: Pair elements should be serializable
   private final A left;
   private final B right;
 
diff --git a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
index 6d5b232..dc38b9e 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
@@ -60,7 +60,7 @@
    */
   private final class BytesDecoder implements Decoder<byte[]> {
 
-    private final InputStream inputStream;
+    private final transient InputStream inputStream;
     private boolean returnedArray;
 
     /**
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
index 0f6ea2e..00280ed 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
@@ -88,7 +88,7 @@
     that.iterativeIncomingEdges.forEach((v, es) -> es.forEach(this::addIterativeIncomingEdge));
     that.nonIterativeIncomingEdges.forEach((v, es) -> es.forEach(this::addNonIterativeIncomingEdge));
     that.dagOutgoingEdges.forEach(((v, es) -> es.forEach(this::addDagOutgoingEdge)));
-    that.edgeWithLoopToEdgeWithInternalVertex.forEach((eLoop, eInternal) -> this.mapEdgeWithLoop(eLoop, eInternal));
+    that.edgeWithLoopToEdgeWithInternalVertex.forEach(this::mapEdgeWithLoop);
     this.maxNumberOfIterations = that.maxNumberOfIterations;
     this.terminationCondition = that.terminationCondition;
   }
@@ -106,7 +106,7 @@
   }
 
   /**
-   * @return the DAG of rthe LoopVertex
+   * @return the DAG of the LoopVertex
    */
   public DAG<IRVertex, IREdge> getDAG() {
     return builder.buildWithoutSourceSinkCheck();
diff --git a/common/src/test/java/org/apache/nemo/common/DAGTest.java b/common/src/test/java/org/apache/nemo/common/DAGTest.java
index 91708f2..60e7f15 100644
--- a/common/src/test/java/org/apache/nemo/common/DAGTest.java
+++ b/common/src/test/java/org/apache/nemo/common/DAGTest.java
@@ -138,7 +138,7 @@
     assertEquals(descendants.size(), 0);
 
     descendants = dag.getDescendants("2");
-    assertEquals(descendants.size(), 1);
+    assertEquals(1, descendants.size());
     assertTrue(descendants.contains(new IntegerVertex(3)));
   }
 
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
index c8fe064..9915760 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
@@ -109,8 +109,8 @@
 
     this.curWatermark = newWatermark;
     // TODO #282: Handle late data
-    inMemorySideInputs.entrySet().removeIf(entry -> {
-      return entry.getKey().right().maxTimestamp().getMillis() <= this.curWatermark; // Discard old sideinputs.
-    });
+    inMemorySideInputs.entrySet().removeIf(entry ->
+      entry.getKey().right().maxTimestamp().getMillis() <= this.curWatermark // Discard old sideinputs.
+    );
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
index d4e3185..32d4d11 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
@@ -78,7 +78,7 @@
    */
   @Override
   public void write(final K key,
-                    final Object element) throws BlockWriteException {
+                    final Object element) {
     if (committed) {
       throw new BlockWriteException(new Throwable("The partition is already committed!"));
     } else {
@@ -104,7 +104,7 @@
    * @throws BlockWriteException for any error occurred while trying to write a block.
    */
   @Override
-  public void writePartitions(final Iterable<NonSerializedPartition<K>> partitions) throws BlockWriteException {
+  public void writePartitions(final Iterable<NonSerializedPartition<K>> partitions) {
     if (!committed) {
       try {
         final Iterable<SerializedPartition<K>> convertedPartitions = DataUtil.convertToSerPartitions(
@@ -127,7 +127,7 @@
    * @throws BlockWriteException for any error occurred while trying to write a block.
    */
   @Override
-  public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> partitions) throws BlockWriteException {
+  public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> partitions) {
     if (!committed) {
       partitions.forEach(serializedPartitions::add);
     } else {
@@ -145,7 +145,7 @@
    * @throws BlockFetchException for any error occurred while trying to fetch a block.
    */
   @Override
-  public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange keyRange) throws BlockFetchException {
+  public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange keyRange) {
     try {
       return DataUtil.convertToNonSerPartitions(serializer, readSerializedPartitions(keyRange));
     } catch (final IOException e) {
@@ -162,7 +162,7 @@
    * @throws BlockFetchException for any error occurred while trying to fetch a block.
    */
   @Override
-  public Iterable<SerializedPartition<K>> readSerializedPartitions(final KeyRange keyRange) throws BlockFetchException {
+  public Iterable<SerializedPartition<K>> readSerializedPartitions(final KeyRange keyRange) {
     if (committed) {
       final List<SerializedPartition<K>> partitionsInRange = new ArrayList<>();
       serializedPartitions.forEach(serializedPartition -> {
@@ -186,7 +186,7 @@
    * @throws BlockWriteException for any error occurred while trying to write a block.
    */
   @Override
-  public synchronized Optional<Map<K, Long>> commit() throws BlockWriteException {
+  public synchronized Optional<Map<K, Long>> commit() {
     try {
       if (!committed) {
         commitPartitions();
@@ -213,7 +213,7 @@
    * Commits all un-committed partitions.
    */
   @Override
-  public synchronized void commitPartitions() throws BlockWriteException {
+  public synchronized void commitPartitions() {
     try {
       for (final SerializedPartition<K> partition : nonCommittedPartitionsMap.values()) {
         partition.commit();