[NEMO-429] SWPP TEAM4 Code Smell Fix (#270)

JIRA: [NEMO-429: SWPP TEAM4 Code Smell Fix](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-429)

**Major changes:**
- Fixed code smells
diff --git a/common/src/main/java/org/apache/nemo/common/StateMachine.java b/common/src/main/java/org/apache/nemo/common/StateMachine.java
index bc76b61..75c3a65 100644
--- a/common/src/main/java/org/apache/nemo/common/StateMachine.java
+++ b/common/src/main/java/org/apache/nemo/common/StateMachine.java
@@ -255,7 +255,7 @@
      */
     public Builder addState(final Enum stateEnum, final String description) {
       if (stateEnumSet.contains(stateEnum)) {
-        throw new RuntimeException("A state " + stateEnum + " was already added");
+        throw new RuntimeException(stateEnum + " was already added");
       }
 
       stateEnumSet.add(stateEnum);
@@ -269,9 +269,8 @@
      * @throws RuntimeException if the initial state was not added first
      */
     public Builder setInitialState(final Enum stateToSet) {
-      if (!stateEnumSet.contains(stateToSet)) {
-        throw new RuntimeException("A state " + stateToSet + " should be added first");
-      }
+      checkStateWasAdded(stateToSet);
+
       this.initialState = stateToSet;
       return this;
     }
@@ -287,13 +286,8 @@
      *                          was already added
      */
     public Builder addTransition(final Enum from, final Enum to, final String description) {
-      if (!stateEnumSet.contains(from)) {
-        throw new RuntimeException("A state " + from + " should be added first");
-      }
-
-      if (!stateEnumSet.contains(to)) {
-        throw new RuntimeException("A state " + to + " should be added first");
-      }
+      checkStateWasAdded(from);
+      checkStateWasAdded(to);
 
       final Pair<Enum, String> transition = Pair.of(to, description);
 
@@ -336,5 +330,12 @@
 
       return new StateMachine(stateMap, initialState);
     }
+
+    private void checkStateWasAdded(final Enum state) {
+      if (!stateEnumSet.contains(state)) {
+        throw new RuntimeException("State " + state + " should be added first");
+      }
+    }
+
   }
 }
diff --git a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
index 37e2463..bcb76dd 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
@@ -217,7 +217,7 @@
       modifiedDAG = rebuildExcluding(modifiedDAG, vertexGroupToDelete).buildWithoutSourceSinkCheck();
       final Optional<Integer> deletedMessageIdOptional = vertexGroupToDelete.stream()
         .filter(vtd -> vtd instanceof MessageAggregatorVertex)
-        .map(vtd -> vtd.getPropertyValue(MessageIdVertexProperty.class).orElseThrow(
+        .map(vtd -> vtd.getPropertyValue(MessageIdVertexProperty.class).<IllegalArgumentException>orElseThrow(
           () -> new IllegalArgumentException(
             "MessageAggregatorVertex " + vtd.getId() + " does not have MessageIdVertexProperty.")))
         .findAny();
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
index 3c36ba6..55be8f7 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/GroupByKeyTransform.java
@@ -30,6 +30,7 @@
 
 /**
  * Transform for group by key transformation.
+ * TODO #431: Handle states in Transforms better
  *
  * @param <K> key type.
  * @param <V> value type.
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
index 8965ae2..127c701 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/LocalTextFileTransform.java
@@ -28,6 +28,7 @@
 
 /**
  * Transform which saves elements to a local text file for Spark.
+ * TODO #431: Handle states in Transforms better
  *
  * @param <I> input type.
  */
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewHandlingUtil.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewHandlingUtil.java
index 8e9f597..fa4d728 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewHandlingUtil.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewHandlingUtil.java
@@ -68,13 +68,15 @@
 
   static EncoderProperty getEncoder(final IREdge irEdge) {
     return EncoderProperty.of(PairEncoderFactory
-      .of(irEdge.getPropertyValue(KeyEncoderProperty.class).orElseThrow(IllegalStateException::new),
+      .of(irEdge.getPropertyValue(KeyEncoderProperty.class)
+          .<IllegalStateException>orElseThrow(IllegalStateException::new),
         LongEncoderFactory.of()));
   }
 
   static DecoderProperty getDecoder(final IREdge irEdge) {
     return DecoderProperty.of(PairDecoderFactory
-      .of(irEdge.getPropertyValue(KeyDecoderProperty.class).orElseThrow(IllegalStateException::new),
+      .of(irEdge.getPropertyValue(KeyDecoderProperty.class).
+          <IllegalStateException>orElseThrow(IllegalStateException::new),
         LongDecoderFactory.of()));
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
index 4aed25d..53c8f77 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -26,7 +26,6 @@
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.nemo.common.KeyRange;
 import org.apache.nemo.common.exception.BlockFetchException;
-import org.apache.nemo.common.exception.BlockWriteException;
 import org.apache.nemo.common.exception.UnsupportedBlockStoreException;
 import org.apache.nemo.common.exception.UnsupportedExecutionPropertyException;
 import org.apache.nemo.common.ir.edge.executionproperty.BlockFetchFailureProperty;
@@ -160,10 +159,9 @@
    * @param blockId    the ID of the block to create.
    * @param blockStore the store to place the block.
    * @return the created block.
-   * @throws BlockWriteException for any error occurred while trying to create a block.
    */
   public Block createBlock(final String blockId,
-                           final DataStoreProperty.Value blockStore) throws BlockWriteException {
+                           final DataStoreProperty.Value blockStore) {
     final BlockStore store = getBlockStore(blockStore);
     return store.createBlock(blockId);
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/metadata/FileMetadata.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/metadata/FileMetadata.java
index da44a52..cce6800 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/metadata/FileMetadata.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/metadata/FileMetadata.java
@@ -81,9 +81,8 @@
    * Gets a list containing the partition metadata of corresponding block.
    *
    * @return the list containing the partition metadata.
-   * @throws IOException if fail to get the iterable.
    */
-  public final List<PartitionMetadata<K>> getPartitionMetadataList() throws IOException {
+  public final List<PartitionMetadata<K>> getPartitionMetadataList() {
     return Collections.unmodifiableList(partitionMetadataList);
   }