[NEMO-429] SWPP TEAM19 Code Smell Fix (#269)
JIRA: [NEMO-429: TEAM19](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-429)
**Major changes:**
- Fixed code smells (SWPP Code Smell session)
**Minor changes to note:**
- As TA suggested, we add ```TODO #430: Pair elements should be serializable``` at Pair.java
diff --git a/common/src/main/java/org/apache/nemo/common/Pair.java b/common/src/main/java/org/apache/nemo/common/Pair.java
index d14203f..b8189ad 100644
--- a/common/src/main/java/org/apache/nemo/common/Pair.java
+++ b/common/src/main/java/org/apache/nemo/common/Pair.java
@@ -28,6 +28,7 @@
* @param <B> type of the right element.
*/
public final class Pair<A, B> implements Serializable {
+ // TODO #430: Pair elements should be serializable
private final A left;
private final B right;
diff --git a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
index 6d5b232..dc38b9e 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
@@ -60,7 +60,7 @@
*/
private final class BytesDecoder implements Decoder<byte[]> {
- private final InputStream inputStream;
+ private final transient InputStream inputStream;
private boolean returnedArray;
/**
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
index 0f6ea2e..00280ed 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/LoopVertex.java
@@ -88,7 +88,7 @@
that.iterativeIncomingEdges.forEach((v, es) -> es.forEach(this::addIterativeIncomingEdge));
that.nonIterativeIncomingEdges.forEach((v, es) -> es.forEach(this::addNonIterativeIncomingEdge));
that.dagOutgoingEdges.forEach(((v, es) -> es.forEach(this::addDagOutgoingEdge)));
- that.edgeWithLoopToEdgeWithInternalVertex.forEach((eLoop, eInternal) -> this.mapEdgeWithLoop(eLoop, eInternal));
+ that.edgeWithLoopToEdgeWithInternalVertex.forEach(this::mapEdgeWithLoop);
this.maxNumberOfIterations = that.maxNumberOfIterations;
this.terminationCondition = that.terminationCondition;
}
@@ -106,7 +106,7 @@
}
/**
- * @return the DAG of rthe LoopVertex
+ * @return the DAG of the LoopVertex
*/
public DAG<IRVertex, IREdge> getDAG() {
return builder.buildWithoutSourceSinkCheck();
diff --git a/common/src/test/java/org/apache/nemo/common/DAGTest.java b/common/src/test/java/org/apache/nemo/common/DAGTest.java
index 91708f2..60e7f15 100644
--- a/common/src/test/java/org/apache/nemo/common/DAGTest.java
+++ b/common/src/test/java/org/apache/nemo/common/DAGTest.java
@@ -138,7 +138,7 @@
assertEquals(descendants.size(), 0);
descendants = dag.getDescendants("2");
- assertEquals(descendants.size(), 1);
+ assertEquals(1, descendants.size());
assertTrue(descendants.contains(new IntegerVertex(3)));
}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
index c8fe064..9915760 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/InMemorySideInputReader.java
@@ -109,8 +109,8 @@
this.curWatermark = newWatermark;
// TODO #282: Handle late data
- inMemorySideInputs.entrySet().removeIf(entry -> {
- return entry.getKey().right().maxTimestamp().getMillis() <= this.curWatermark; // Discard old sideinputs.
- });
+ inMemorySideInputs.entrySet().removeIf(entry ->
+ entry.getKey().right().maxTimestamp().getMillis() <= this.curWatermark // Discard old sideinputs.
+ );
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
index d4e3185..32d4d11 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
@@ -78,7 +78,7 @@
*/
@Override
public void write(final K key,
- final Object element) throws BlockWriteException {
+ final Object element) {
if (committed) {
throw new BlockWriteException(new Throwable("The partition is already committed!"));
} else {
@@ -104,7 +104,7 @@
* @throws BlockWriteException for any error occurred while trying to write a block.
*/
@Override
- public void writePartitions(final Iterable<NonSerializedPartition<K>> partitions) throws BlockWriteException {
+ public void writePartitions(final Iterable<NonSerializedPartition<K>> partitions) {
if (!committed) {
try {
final Iterable<SerializedPartition<K>> convertedPartitions = DataUtil.convertToSerPartitions(
@@ -127,7 +127,7 @@
* @throws BlockWriteException for any error occurred while trying to write a block.
*/
@Override
- public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> partitions) throws BlockWriteException {
+ public void writeSerializedPartitions(final Iterable<SerializedPartition<K>> partitions) {
if (!committed) {
partitions.forEach(serializedPartitions::add);
} else {
@@ -145,7 +145,7 @@
* @throws BlockFetchException for any error occurred while trying to fetch a block.
*/
@Override
- public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange keyRange) throws BlockFetchException {
+ public Iterable<NonSerializedPartition<K>> readPartitions(final KeyRange keyRange) {
try {
return DataUtil.convertToNonSerPartitions(serializer, readSerializedPartitions(keyRange));
} catch (final IOException e) {
@@ -162,7 +162,7 @@
* @throws BlockFetchException for any error occurred while trying to fetch a block.
*/
@Override
- public Iterable<SerializedPartition<K>> readSerializedPartitions(final KeyRange keyRange) throws BlockFetchException {
+ public Iterable<SerializedPartition<K>> readSerializedPartitions(final KeyRange keyRange) {
if (committed) {
final List<SerializedPartition<K>> partitionsInRange = new ArrayList<>();
serializedPartitions.forEach(serializedPartition -> {
@@ -186,7 +186,7 @@
* @throws BlockWriteException for any error occurred while trying to write a block.
*/
@Override
- public synchronized Optional<Map<K, Long>> commit() throws BlockWriteException {
+ public synchronized Optional<Map<K, Long>> commit() {
try {
if (!committed) {
commitPartitions();
@@ -213,7 +213,7 @@
* Commits all un-committed partitions.
*/
@Override
- public synchronized void commitPartitions() throws BlockWriteException {
+ public synchronized void commitPartitions() {
try {
for (final SerializedPartition<K> partition : nonCommittedPartitionsMap.values()) {
partition.commit();