[NEMO-429] SWPP TEAM3 Code Smell Fix (#265)
JIRA: [NEMO-429: SWPP TEAM 3 Code Smell Fix](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-429)
**Major changes:**
- Fixed code smells(SWPP Code Smell Session)
diff --git a/common/src/test/java/org/apache/nemo/common/DAGTest.java b/common/src/test/java/org/apache/nemo/common/DAGTest.java
index e735fef..629d525 100644
--- a/common/src/test/java/org/apache/nemo/common/DAGTest.java
+++ b/common/src/test/java/org/apache/nemo/common/DAGTest.java
@@ -66,8 +66,8 @@
assertEquals(dag.getIncomingEdgesOf(new IntegerVertex(1)).size(), 0);
assertEquals(dag.getOutgoingEdgesOf(new IntegerVertex(5)).size(), 0);
assertEquals(dag.getIncomingEdgesOf(new IntegerVertex(3)).size(), 1);
- assertEquals(dag.getOutgoingEdgesOf(new IntegerVertex(4)).size(), 1);
- assertEquals(dag.getTopologicalSort().size(), 5);
+ assertEquals(1, dag.getOutgoingEdgesOf(new IntegerVertex(4)).size());
+ assertEquals(5, dag.getTopologicalSort().size());
final List<IntegerVertex> topologicalOrder = dag.getTopologicalSort();
assertEquals(topologicalOrder.get(0).getValue(), 1);
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
index 9a66d72..2d356c0 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceByKeyTransform.java
@@ -39,6 +39,7 @@
public final class ReduceByKeyTransform<K, V> extends NoWatermarkEmitTransform<Tuple2<K, V>, Tuple2<K, V>> {
private static final Logger LOG = LoggerFactory.getLogger(ReduceByKeyTransform.class.getName());
+ // TODO #431: Handle states in Transforms better
private final Map<K, List<V>> keyToValues;
private final Function2<V, V, V> func;
private OutputCollector<Tuple2<K, V>> outputCollector;
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
index b7a445f..3c5e2d7 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
@@ -34,6 +34,7 @@
public final class ReduceTransform<T> implements Transform<T, T> {
private final Function2<T, T, T> func;
private OutputCollector<T> outputCollector;
+ // TODO #431: Handle states in Transforms better
private T result;
/**
@@ -41,14 +42,15 @@
*
* @param func function to run for the reduce transform.
*/
+ // TODO #432: ReduceTransform Unit Test
public ReduceTransform(final Function2<T, T, T> func) {
this.func = func;
- this.result = null;
}
@Override
public void prepare(final Context context, final OutputCollector<T> oc) {
this.outputCollector = oc;
+ this.result = null;
}
@Override
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java
index 00c1f52..c86b756 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/MetricUtils.java
@@ -74,6 +74,7 @@
public static final String POSTGRESQL_METADATA_DB_NAME =
"jdbc:postgresql://nemo-optimization.cabbufr3evny.us-west-2.rds.amazonaws.com:5432/nemo_optimization";
private static final String METADATA_TABLE_NAME = "nemo_optimization_meta";
+ private static final String SAVING_METADATA_FAIL_MSG = "Saving of Metadata to DB failed: ";
/**
* Private constructor.
@@ -166,7 +167,7 @@
try {
insertOrUpdateMetadata(c, "EP_KEY_METADATA", l, r);
} catch (SQLException e) {
- LOG.warn("Saving of Metadata to DB failed: ", e);
+ LOG.warn(SAVING_METADATA_FAIL_MSG, e);
}
});
LOG.info("EP Key Metadata saved to DB.");
@@ -177,14 +178,14 @@
try {
insertOrUpdateMetadata(c, "EP_METADATA", l.left() * 10000 + l.right(), r);
} catch (SQLException e) {
- LOG.warn("Saving of Metadata to DB failed: ", e);
+ LOG.warn(SAVING_METADATA_FAIL_MSG, e);
}
});
LOG.info("EP Metadata saved to DB.");
}
}
} catch (SQLException e) {
- LOG.warn("Saving of Metadata to DB failed: ", e);
+ LOG.warn(SAVING_METADATA_FAIL_MSG, e);
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java
index bd87031..6ffdf5b 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/MemoryStore.java
@@ -63,7 +63,7 @@
* @throws BlockWriteException if fail to write.
*/
@Override
- public void writeBlock(final Block block) throws BlockWriteException {
+ public void writeBlock(final Block block) {
if (!(block instanceof NonSerializedMemoryBlock)) {
throw new BlockWriteException(new Throwable(
this.toString() + "only accept " + NonSerializedPartition.class.getName()));
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
index de46951..19b8382 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/stores/SerializedMemoryStore.java
@@ -62,7 +62,7 @@
* @throws BlockWriteException if fail to write.
*/
@Override
- public void writeBlock(final Block block) throws BlockWriteException {
+ public void writeBlock(final Block block) {
if (!(block instanceof SerializedMemoryBlock)) {
throw new BlockWriteException(new Throwable(
this.toString() + "only accept " + SerializedMemoryBlock.class.getName()));
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
index 1466f4e..4c3d112 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -172,7 +172,7 @@
.map(edge -> edge.getExecutionProperties()
.get(MessageIdEdgeProperty.class)
.<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException(edge.getId())))
- .findFirst().<IllegalArgumentException>orElseThrow(() -> new IllegalArgumentException());
+ .findFirst().<IllegalArgumentException>orElseThrow(IllegalArgumentException::new);
// Type casting is needed. See: https://stackoverflow.com/a/40865318
return messageIds.iterator().next();
@@ -286,7 +286,7 @@
public void onSpeculativeExecutionCheck() {
MutableBoolean isNewCloneCreated = new MutableBoolean(false);
- selectEarliestSchedulableGroup().ifPresent(scheduleGroup -> {
+ selectEarliestSchedulableGroup().ifPresent(scheduleGroup ->
scheduleGroup.stream().map(Stage::getId).forEach(stageId -> {
final Stage stage = planStateManager.getPhysicalPlan().getStageDAG().getVertexById(stageId);
@@ -296,8 +296,8 @@
isNewCloneCreated.setValue(doSpeculativeExecution(stage, cloneConf));
}
});
- });
- });
+ })
+ );
if (isNewCloneCreated.booleanValue()) {
doSchedule(); // Do schedule the new clone.
@@ -513,7 +513,7 @@
final Stage stagePutOnHold = stageDag.getVertices().stream()
.filter(stage -> stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId)))
.findFirst()
- .orElseThrow(() -> new RuntimeException());
+ .orElseThrow(RuntimeException::new);
// Stage put on hold, i.e. stage with vertex containing MessageAggregatorTransform
// should have a parent stage whose outgoing edges contain the target edge of dynamic optimization.