[NEMO-321] Fix the data skew pass metric mismatch (#188)
JIRA: [NEMO-321: Fix the data skew pass metric mismatch](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-321)
**Major changes:**
- Makes `DataSkewRuntimePass` access to the partitioning logic by moving `Partitioner` interface and the implementations from runtime executor to runtime common.
- The mismatch between the data metric produced by `MetricCollectionVertex` and the data metric understood by `DataSkewRuntimePass` is caused by the fact that the partitioning logic is hidden from both components.
**Minor changes to note:**
- Adds an empty edge from every metric aggregation vertex to the next stage (as a control dependency) to delay the next stage's scheduling. At now, the order of these stages' scheduling depends on luck.
- Fix the location of `MetricCollection` property.
- Remove the `DynamicOptimization` property (not needed).
- Re-enable the skewness aware scheduling.
**Tests for the changes:**
- Existing integration test and unit tests.
**Other comments:**
- N/A.
Closes #188
diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/MetricCollectionProperty.java b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/MetricCollectionProperty.java
index 5f18a35..c9749a8 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/MetricCollectionProperty.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/MetricCollectionProperty.java
@@ -21,7 +21,7 @@
import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
/**
- * MetricCollection ExecutionProperty.
+ * MetricCollection ExecutionProperty that indicates the edge of which data metric will be collected.
*/
public final class MetricCollectionProperty extends EdgeExecutionProperty<MetricCollectionProperty.Value> {
/**
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/DynamicOptimizationProperty.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/DynamicOptimizationProperty.java
deleted file mode 100644
index e1ec738..0000000
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/DynamicOptimizationProperty.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.common.ir.vertex.executionproperty;
-
-import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;
-
-/**
- * DynamicOptimizationType ExecutionProperty.
- */
-public final class DynamicOptimizationProperty extends VertexExecutionProperty<DynamicOptimizationProperty.Value> {
- /**
- * Constructor.
- * @param value value of the execution property.
- */
- private DynamicOptimizationProperty(final Value value) {
- super(value);
- }
-
- /**
- * Static method exposing the constructor.
- * @param value value of the new execution property.
- * @return the newly created execution property.
- */
- public static DynamicOptimizationProperty of(final Value value) {
- return new DynamicOptimizationProperty(value);
- }
-
- /**
- * Possible values of DynamicOptimization ExecutionProperty.
- */
- public enum Value {
- DataSkewRuntimePass
- }
-}
diff --git a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
index 1e950be..2a0fe36 100644
--- a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
@@ -26,10 +26,7 @@
import org.apache.nemo.common.ir.OutputCollector;
import org.apache.nemo.common.ir.Readable;
import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.EncoderProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.*;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.OperatorVertex;
import org.apache.nemo.common.ir.vertex.SourceVertex;
@@ -98,11 +95,15 @@
shuffleEdgeBetweenT1AndT2.setProperty(KeyExtractorProperty.of(new DummyBeamKeyExtractor()));
shuffleEdgeBetweenT1AndT2.setProperty(EncoderProperty.of(new EncoderFactory.DummyEncoderFactory()));
shuffleEdgeBetweenT1AndT2.setProperty(DecoderProperty.of(new DecoderFactory.DummyDecoderFactory()));
+ shuffleEdgeBetweenT1AndT2.setProperty(KeyEncoderProperty.of(new EncoderFactory.DummyEncoderFactory()));
+ shuffleEdgeBetweenT1AndT2.setProperty(KeyDecoderProperty.of(new DecoderFactory.DummyDecoderFactory()));
final IREdge shuffleEdgeBetweenT3AndT4 = new IREdge(CommunicationPatternProperty.Value.Shuffle, t3, t4);
shuffleEdgeBetweenT3AndT4.setProperty(KeyExtractorProperty.of(new DummyBeamKeyExtractor()));
shuffleEdgeBetweenT3AndT4.setProperty(EncoderProperty.of(new EncoderFactory.DummyEncoderFactory()));
shuffleEdgeBetweenT3AndT4.setProperty(DecoderProperty.of(new DecoderFactory.DummyDecoderFactory()));
+ shuffleEdgeBetweenT3AndT4.setProperty(KeyEncoderProperty.of(new EncoderFactory.DummyEncoderFactory()));
+ shuffleEdgeBetweenT3AndT4.setProperty(KeyDecoderProperty.of(new DecoderFactory.DummyDecoderFactory()));
dagBuilder.addVertex(s);
dagBuilder.addVertex(t1);
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
index 824c99b..8207320 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
@@ -29,11 +29,10 @@
final class BeamKeyExtractor implements KeyExtractor {
@Override
public Object extractKey(final Object element) {
- final WindowedValue windowedValue = (WindowedValue) element;
- final Object value = windowedValue.getValue();
- if (value instanceof KV) {
+ final Object valueToExtract = element instanceof WindowedValue ? ((WindowedValue) element).getValue() : element;
+ if (valueToExtract instanceof KV) {
// Handle null keys, since Beam allows KV with null keys.
- final Object key = ((KV) value).getKey();
+ final Object key = ((KV) valueToExtract).getKey();
return key == null ? 0 : key;
} else {
return element;
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
deleted file mode 100644
index 0febda5..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.dag.DAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.vertex.IRVertex;
-import org.apache.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
-import org.apache.nemo.common.ir.vertex.OperatorVertex;
-import org.apache.nemo.common.ir.vertex.transform.MetricCollectTransform;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-/**
- * Pass to annotate the IR DAG for skew handling.
- *
- * It specifies the target of dynamic optimization for skew handling
- * by setting appropriate {@link MetricCollectionProperty} to
- * outgoing shuffle edges from vertices with {@link MetricCollectTransform}.
- */
-@Annotates(MetricCollectionProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class SkewMetricCollectionPass extends AnnotatingPass {
- /**
- * Default constructor.
- */
- public SkewMetricCollectionPass() {
- super(SkewMetricCollectionPass.class);
- }
-
- @Override
- public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
- dag.topologicalDo(v -> {
- // we only care about metric collection vertices.
- if (v instanceof OperatorVertex
- && ((OperatorVertex) v).getTransform() instanceof MetricCollectTransform) {
- dag.getOutgoingEdgesOf(v).forEach(edge -> {
- // double checking.
- if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
- .equals(CommunicationPatternProperty.Value.Shuffle)) {
- edge.setPropertyPermanently(MetricCollectionProperty.of(
- MetricCollectionProperty.Value.DataSkewRuntimePass));
- }
- });
- }
- });
- return dag;
- }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
index b9f4705..45401e0 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
@@ -23,12 +23,8 @@
import org.apache.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
-import org.apache.nemo.common.ir.vertex.OperatorVertex;
-import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-import java.util.List;
-
/**
* Transient resource pass for tagging edges with {@link PartitionerProperty}.
*/
@@ -44,19 +40,13 @@
@Override
public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
- dag.getVertices().forEach(v -> {
- if (v instanceof OperatorVertex
- && ((OperatorVertex) v).getTransform() instanceof AggregateMetricTransform) {
- final List<IREdge> outEdges = dag.getOutgoingEdgesOf(v);
- outEdges.forEach(edge -> {
- // double checking.
- if (MetricCollectionProperty.Value.DataSkewRuntimePass
- .equals(edge.getPropertyValue(MetricCollectionProperty.class).get())) {
- edge.setPropertyPermanently(PartitionerProperty.of(PartitionerProperty.Value.DataSkewHashPartitioner));
- }
- });
- }
- });
+ dag.getVertices()
+ .forEach(v -> dag.getOutgoingEdgesOf(v).stream()
+ .filter(edge -> edge.getPropertyValue(MetricCollectionProperty.class).isPresent())
+ .forEach(skewEdge -> skewEdge
+ .setPropertyPermanently(PartitionerProperty.of(PartitionerProperty.Value.DataSkewHashPartitioner))
+ )
+ );
return dag;
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
index 784ba64..425c939 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
@@ -20,22 +20,20 @@
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
import org.apache.nemo.common.ir.vertex.IRVertex;
-import org.apache.nemo.common.ir.vertex.OperatorVertex;
-import org.apache.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ResourceSkewedDataProperty;
-import org.apache.nemo.common.ir.vertex.transform.MetricCollectTransform;
-
-import java.util.List;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
/**
* Pass to annotate the IR DAG for skew handling.
*
- * It marks children and descendents of vertex with {@link MetricCollectTransform},
+ * It marks children and descendents of vertex with {@link ResourceSkewedDataProperty},
* which collects task-level statistics used for dynamic optimization,
* with {@link ResourceSkewedDataProperty} to perform skewness-aware scheduling.
*/
-@Annotates(DynamicOptimizationProperty.class)
+@Annotates(ResourceSkewedDataProperty.class)
+@Requires(MetricCollectionProperty.class)
public final class SkewResourceSkewedDataPass extends AnnotatingPass {
/**
* Default constructor.
@@ -44,40 +42,19 @@
super(SkewResourceSkewedDataPass.class);
}
- /**
- * @param dag that contains the {@code v}.
- * @param v to inspect.
- * @return whether or not the vertex has parent with MetricCollectTransform.
- */
- private boolean hasParentWithMetricCollectTransform(final DAG<IRVertex, IREdge> dag,
- final IRVertex v) {
- List<IRVertex> parents = dag.getParents(v.getId());
- for (IRVertex parent : parents) {
- if (parent instanceof OperatorVertex
- && ((OperatorVertex) v).getTransform() instanceof MetricCollectTransform) {
- return true;
- }
- }
- return false;
- }
-
@Override
public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
- dag.getVertices().stream()
- .filter(v -> v instanceof OperatorVertex
- && ((OperatorVertex) v).getTransform() instanceof MetricCollectTransform)
- .forEach(v -> v.setProperty(DynamicOptimizationProperty
- .of(DynamicOptimizationProperty.Value.DataSkewRuntimePass)));
-
- dag.getVertices().stream()
- .filter(v -> hasParentWithMetricCollectTransform(dag, v)
- && !v.getExecutionProperties().containsKey(ResourceSkewedDataProperty.class))
- .forEach(childV -> {
- childV.getExecutionProperties().put(ResourceSkewedDataProperty.of(true));
- dag.getDescendants(childV.getId()).forEach(descendentV -> {
+ dag.getVertices()
+ .forEach(v -> dag.getOutgoingEdgesOf(v).stream()
+ .filter(edge -> edge.getPropertyValue(MetricCollectionProperty.class).isPresent())
+ .forEach(skewEdge -> {
+ final IRVertex dstV = skewEdge.getDst();
+ dstV.setProperty(ResourceSkewedDataProperty.of(true));
+ dag.getDescendants(dstV.getId()).forEach(descendentV -> {
descendentV.getExecutionProperties().put(ResourceSkewedDataProperty.of(true));
});
- });
+ })
+ );
return dag;
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePass.java
index bfae788..2ca955c 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePass.java
@@ -33,8 +33,8 @@
public SkewCompositePass() {
super(Arrays.asList(
new SkewReshapingPass(),
- new SkewResourceSkewedDataPass(),
- new SkewMetricCollectionPass()
+ new SkewPartitionerPass(),
+ new SkewResourceSkewedDataPass()
));
}
}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/ReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/ReshapingPass.java
index dd80faa..85af2dc 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/ReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/ReshapingPass.java
@@ -21,6 +21,7 @@
import org.apache.nemo.common.ir.executionproperty.ExecutionProperty;
import org.apache.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.Annotates;
import java.util.Arrays;
import java.util.HashSet;
@@ -31,6 +32,7 @@
* It is ensured by the compiler that no execution properties are modified by a ReshapingPass.
*/
public abstract class ReshapingPass extends CompileTimePass {
+ private final Set<Class<? extends ExecutionProperty>> executionPropertiesToAnnotate;
private final Set<Class<? extends ExecutionProperty>> prerequisiteExecutionProperties;
/**
@@ -41,6 +43,18 @@
final Requires requires = cls.getAnnotation(Requires.class);
this.prerequisiteExecutionProperties = requires == null
? new HashSet<>() : new HashSet<>(Arrays.asList(requires.value()));
+
+ final Annotates annotates = cls.getAnnotation(Annotates.class);
+ this.executionPropertiesToAnnotate = annotates == null
+ ? new HashSet<>() : new HashSet<>(Arrays.asList(annotates.value()));
+ }
+
+ /**
+ * Getter for the execution properties to annotate through the pass.
+ * @return key of execution properties to annotate through the pass.
+ */
+ public final Set<Class<? extends ExecutionProperty>> getExecutionPropertiesToAnnotate() {
+ return executionPropertiesToAnnotate;
}
/**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
index 0236b67..da436d9 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
@@ -35,6 +35,7 @@
import org.apache.nemo.common.ir.vertex.transform.MetricCollectTransform;
import org.apache.nemo.compiler.optimizer.PairKeyExtractor;
import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.Annotates;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +54,7 @@
* 2) Stage-level statistic aggregation is done via vertex with {@link AggregateMetricTransform}
* inserted before shuffle edges.
* */
+@Annotates(MetricCollectionProperty.class)
@Requires(CommunicationPatternProperty.class)
public final class SkewReshapingPass extends ReshapingPass {
private static final Logger LOG = LoggerFactory.getLogger(SkewReshapingPass.class.getName());
@@ -95,10 +97,17 @@
final IREdge edgeToOriginalDstV =
new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), edge.getSrc(), v);
edge.copyExecutionPropertiesTo(edgeToOriginalDstV);
+ edgeToOriginalDstV.setPropertyPermanently(
+ MetricCollectionProperty.of(MetricCollectionProperty.Value.DataSkewRuntimePass));
builder.connectVertices(edgeToMCV);
builder.connectVertices(edgeToABV);
builder.connectVertices(edgeToOriginalDstV);
+
+ // Add an control dependency (no output)
+ final IREdge emptyEdge =
+ new IREdge(CommunicationPatternProperty.Value.BroadCast, abv, v);
+ builder.connectVertices(emptyEdge);
} else {
builder.connectVertices(edge);
}
@@ -199,7 +208,7 @@
final IREdge newEdge = new IREdge(CommunicationPatternProperty.Value.Shuffle, mcv, abv);
newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
- newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+ newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Push));
newEdge.setProperty(KeyExtractorProperty.of(new PairKeyExtractor()));
newEdge.setProperty(AdditionalOutputTagProperty.of(ADDITIONAL_OUTPUT_TAG));
@@ -210,12 +219,13 @@
&& edge.getPropertyValue(KeyDecoderProperty.class).isPresent()) {
final EncoderFactory keyEncoderFactory = edge.getPropertyValue(KeyEncoderProperty.class).get();
final DecoderFactory keyDecoderFactory = edge.getPropertyValue(KeyDecoderProperty.class).get();
- newEdge.setProperty(EncoderProperty.of(PairEncoderFactory.of(keyEncoderFactory, LongEncoderFactory.of())));
- newEdge.setProperty(DecoderProperty.of(PairDecoderFactory.of(keyDecoderFactory, LongDecoderFactory.of())));
+ newEdge.setPropertyPermanently(
+ EncoderProperty.of(PairEncoderFactory.of(keyEncoderFactory, LongEncoderFactory.of())));
+ newEdge.setPropertyPermanently(
+ DecoderProperty.of(PairDecoderFactory.of(keyDecoderFactory, LongDecoderFactory.of())));
} else {
// If not specified, follow encoder/decoder of the given shuffle edge.
- newEdge.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
- newEdge.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
+ throw new RuntimeException("Skew optimization request for none key - value format data!");
}
return newEdge;
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/DataSkewPolicy.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/DataSkewPolicy.java
index 2a48007..abb0bcf 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/DataSkewPolicy.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/DataSkewPolicy.java
@@ -34,8 +34,7 @@
public final class DataSkewPolicy implements Policy {
public static final PolicyBuilder BUILDER =
new PolicyBuilder()
- .registerRuntimePass(new DataSkewRuntimePass().setNumSkewedKeys(DataSkewRuntimePass.DEFAULT_NUM_SKEWED_KEYS),
- new SkewCompositePass())
+ .registerRuntimePass(new DataSkewRuntimePass(), new SkewCompositePass())
.registerCompileTimePass(new LoopOptimizationCompositePass())
.registerCompileTimePass(new DefaultCompositePass());
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilder.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilder.java
index fe413a2..770cf73 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilder.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilder.java
@@ -32,6 +32,7 @@
import org.apache.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
import org.apache.nemo.compiler.optimizer.pass.compiletime.composite.CompositePass;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping.ReshapingPass;
import org.apache.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
import java.util.*;
@@ -85,6 +86,9 @@
if (compileTimePass instanceof AnnotatingPass) {
final AnnotatingPass annotatingPass = (AnnotatingPass) compileTimePass;
this.annotatedExecutionProperties.addAll(annotatingPass.getExecutionPropertiesToAnnotate());
+ } else if (compileTimePass instanceof ReshapingPass) {
+ final ReshapingPass reshapingPass = (ReshapingPass) compileTimePass;
+ this.annotatedExecutionProperties.addAll(reshapingPass.getExecutionPropertiesToAnnotate());
}
this.compileTimePasses.add(compileTimePass);
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java
index 8ef91e5..46937e9 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java
@@ -101,8 +101,13 @@
processedDAG.filterVertices(v -> v instanceof OperatorVertex
&& ((OperatorVertex) v).getTransform() instanceof MetricCollectTransform)
.forEach(metricV -> {
- List<IRVertex> reducerV = processedDAG.getChildren(metricV.getId());
- reducerV.forEach(rV -> assertTrue(rV.getPropertyValue(ResourceSkewedDataProperty.class).get()));
- });
+ final List<IRVertex> reducerV = processedDAG.getChildren(metricV.getId());
+ reducerV.forEach(rV -> {
+ if (rV instanceof OperatorVertex &&
+ !(((OperatorVertex) rV).getTransform() instanceof AggregateMetricTransform)) {
+ assertTrue(rV.getPropertyValue(ResourceSkewedDataProperty.class).get());
+ }
+ });
+ });
}
}
diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
index 1bdb7c9..7b31372 100644
--- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java
+++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
@@ -211,19 +211,6 @@
}
/**
- * Hash range multiplier.
- * If we need to split or recombine an output data from a task after it is stored,
- * we multiply the hash range with this factor in advance
- * to prevent the extra deserialize - rehash - serialize process.
- * In these cases, the hash range will be (hash range multiplier X destination task parallelism).
- * The reason why we do not divide the output into a fixed number is that the fixed number can be smaller than
- * the destination task parallelism.
- */
- @NamedParameter(doc = "Hash range multiplier", short_name = "hash_range_multiplier", default_value = "10")
- public final class HashRangeMultiplier implements Name<Integer> {
- }
-
- /**
* The TCP port to which local block transfer binds. 0 means random port.
*/
@NamedParameter(doc = "Port to which PartitionTransport binds (0 means random port)",
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 63e131d..2f6c998 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -23,10 +23,13 @@
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.eventhandler.RuntimeEventHandler;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.common.KeyRange;
import org.apache.nemo.common.HashRange;
import org.apache.nemo.runtime.common.eventhandler.DynamicOptimizationEventHandler;
+import org.apache.nemo.runtime.common.partitioner.DataSkewHashPartitioner;
+import org.apache.nemo.runtime.common.partitioner.Partitioner;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.common.plan.Stage;
import org.apache.nemo.runtime.common.plan.StageEdge;
@@ -44,29 +47,39 @@
*/
public final class DataSkewRuntimePass extends RuntimePass<Pair<StageEdge, Map<Object, Long>>> {
private static final Logger LOG = LoggerFactory.getLogger(DataSkewRuntimePass.class.getName());
+ private static final int DEFAULT_NUM_SKEWED_KEYS = 1;
+ /*
+ * Hash range multiplier.
+ * If we need to split or recombine an output data from a task after it is stored,
+ * we multiply the hash range with this factor in advance
+ * to prevent the extra deserialize - rehash - serialize process.
+ * In these cases, the hash range will be (hash range multiplier X destination task parallelism).
+ * The reason why we do not divide the output into a fixed number is that the fixed number can be smaller than
+ * the destination task parallelism.
+ */
+ public static final int HASH_RANGE_MULTIPLIER = 10;
+
private final Set<Class<? extends RuntimeEventHandler>> eventHandlers;
// Skewed keys denote for top n keys in terms of partition size.
- public static final int DEFAULT_NUM_SKEWED_KEYS = 1;
- private int numSkewedKeys;
+ private final int numSkewedKeys;
/**
- * Constructor.
+ * Constructor without expected number of skewed keys.
*/
public DataSkewRuntimePass() {
- this.eventHandlers = Collections.singleton(DynamicOptimizationEventHandler.class);
- this.numSkewedKeys = DEFAULT_NUM_SKEWED_KEYS;
+ this(DEFAULT_NUM_SKEWED_KEYS);
}
+ /**
+ * Constructor with expected number of skewed keys.
+ *
+ * @param numOfSkewedKeys the expected number of skewed keys.
+ */
public DataSkewRuntimePass(final int numOfSkewedKeys) {
- this();
+ this.eventHandlers = Collections.singleton(DynamicOptimizationEventHandler.class);
this.numSkewedKeys = numOfSkewedKeys;
}
- public DataSkewRuntimePass setNumSkewedKeys(final int numOfSkewedKeys) {
- numSkewedKeys = numOfSkewedKeys;
- return this;
- }
-
@Override
public Set<Class<? extends RuntimeEventHandler>> getEventHandlerClasses() {
return this.eventHandlers;
@@ -79,9 +92,15 @@
// Get number of evaluators of the next stage (number of blocks).
final Integer dstParallelism = targetEdge.getDst().getPropertyValue(ParallelismProperty.class).
orElseThrow(() -> new RuntimeException("No parallelism on a vertex"));
+ if (!PartitionerProperty.Value.DataSkewHashPartitioner
+ .equals(targetEdge.getPropertyValue(PartitionerProperty.class)
+ .orElseThrow(() -> new RuntimeException("No partitioner property!")))) {
+ throw new RuntimeException("Invalid partitioner is assigned to the target edge!");
+ }
+ final DataSkewHashPartitioner partitioner = (DataSkewHashPartitioner) Partitioner.getPartitioner(targetEdge);
// Calculate keyRanges.
- final List<KeyRange> keyRanges = calculateKeyRanges(metricData.right(), dstParallelism);
+ final List<KeyRange> keyRanges = calculateKeyRanges(metricData.right(), dstParallelism, partitioner);
final Map<Integer, KeyRange> taskIdxToKeyRange = new HashMap<>();
for (int i = 0; i < dstParallelism; i++) {
taskIdxToKeyRange.put(i, keyRanges.get(i));
@@ -129,25 +148,39 @@
/**
* Evenly distribute the skewed data to the destination tasks.
- * Partition denotes for a keyed portion of a Task output, whose key is a key.
- * Using a map of key to partition size, this method groups the given partitions
- * to a key range of partitions with approximate size of (total size of partitions / the number of tasks).
+ * Partition denotes for a keyed portion of a Task output.
+ * Using a map of actual data key to count, this method gets the size of each the given partitions and
+ * redistribute the key range of partitions with approximate size of (total size of partitions / the number of tasks).
+ * Assumption: the returned key of the partitioner is always 0 or positive integer.
*
- * @param keyToPartitionSizeMap a map of key to partition size.
+ * @param keyToCountMap a map of actual key to count.
* @param dstParallelism the number of tasks that receive this data as input.
+ * @param partitioner the partitioner.
* @return the list of key ranges calculated.
*/
@VisibleForTesting
- public List<KeyRange> calculateKeyRanges(final Map<Object, Long> keyToPartitionSizeMap,
- final Integer dstParallelism) {
- final List<Long> partitionSizeList = new ArrayList<>();
- keyToPartitionSizeMap.forEach((k, v) -> partitionSizeList.add(v));
+ public List<KeyRange> calculateKeyRanges(final Map<Object, Long> keyToCountMap,
+ final Integer dstParallelism,
+ final Partitioner<Integer> partitioner) {
+ final Map<Integer, Long> partitionKeyToPartitionCount = new HashMap<>();
+ int lastKey = 0;
+ // Aggregate the counts per each "partition key" assigned by Partitioner.
- // Get the last index.
- final int lastKey = partitionSizeList.size() - 1;
+ for (final Map.Entry<Object, Long> entry : keyToCountMap.entrySet()) {
+ final int partitionKey = partitioner.partition(entry.getKey());
+ lastKey = Math.max(lastKey, partitionKey);
+ partitionKeyToPartitionCount.compute(partitionKey,
+ (existPartitionKey, prevCount) -> (prevCount == null) ? entry.getValue() : prevCount + entry.getValue());
+ }
+
+ final List<Long> partitionSizeList = new ArrayList<>(lastKey + 1);
+ for (int i = 0; i <= lastKey; i++) {
+ final long countsForKey = partitionKeyToPartitionCount.getOrDefault(i, 0L);
+ partitionSizeList.add(countsForKey);
+ }
// Identify skewed sizes, which is top numSkewedKeys number of keys.
- List<Long> skewedSizes = identifySkewedKeys(partitionSizeList);
+ final List<Long> skewedSizes = identifySkewedKeys(partitionSizeList);
// Calculate the ideal size for each destination task.
final Long totalSize = partitionSizeList.stream().mapToLong(n -> n).sum(); // get total size
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DataSkewHashPartitioner.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DataSkewHashPartitioner.java
similarity index 87%
rename from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DataSkewHashPartitioner.java
rename to runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DataSkewHashPartitioner.java
index a79defc..910a4b3 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DataSkewHashPartitioner.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DataSkewHashPartitioner.java
@@ -16,9 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.runtime.executor.data.partitioner;
+package org.apache.nemo.runtime.common.partitioner;
import org.apache.nemo.common.KeyExtractor;
+import org.apache.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +33,7 @@
* When we need to split or recombine the output data from a task after it is stored,
* we multiply the hash range with a multiplier, which is commonly-known by the source and destination tasks,
* to prevent the extra deserialize - rehash - serialize process.
- * For more information, please check {@link org.apache.nemo.conf.JobConf.HashRangeMultiplier}.
+ * For more information, please check {@link DataSkewRuntimePass#HASH_RANGE_MULTIPLIER}.
*/
public final class DataSkewHashPartitioner implements Partitioner<Integer> {
private static final Logger LOG = LoggerFactory.getLogger(DataSkewHashPartitioner.class.getName());
@@ -43,17 +44,15 @@
/**
* Constructor.
*
- * @param hashRangeMultiplier the hash range multiplier.
* @param dstParallelism the number of destination tasks.
* @param keyExtractor the key extractor that extracts keys from elements.
*/
- public DataSkewHashPartitioner(final int hashRangeMultiplier,
- final int dstParallelism,
+ public DataSkewHashPartitioner(final int dstParallelism,
final KeyExtractor keyExtractor) {
this.keyExtractor = keyExtractor;
// For this hash range, please check the description of HashRangeMultiplier in JobConf.
// For actual hash range to use, we calculate a prime number right next to the desired hash range.
- this.hashRangeBase = new BigInteger(String.valueOf(dstParallelism * hashRangeMultiplier));
+ this.hashRangeBase = new BigInteger(String.valueOf(dstParallelism * DataSkewRuntimePass.HASH_RANGE_MULTIPLIER));
this.hashRange = hashRangeBase.nextProbablePrime().intValue();
LOG.info("hashRangeBase {} resulting hashRange {}", hashRangeBase, hashRange);
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElement.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElement.java
similarity index 94%
rename from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElement.java
rename to runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElement.java
index 4667702..bf476a8 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElement.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElement.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.runtime.executor.data.partitioner;
+package org.apache.nemo.runtime.common.partitioner;
import java.lang.annotation.*;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
similarity index 96%
rename from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
rename to runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
index 43d11cf..e9504c2 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.runtime.executor.data.partitioner;
+package org.apache.nemo.runtime.common.partitioner;
/**
* An implementation of {@link Partitioner} which assigns a dedicated key per an output data from a task.
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/HashPartitioner.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/HashPartitioner.java
similarity index 96%
rename from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/HashPartitioner.java
rename to runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/HashPartitioner.java
index 175a4f8..605146f 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/HashPartitioner.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/HashPartitioner.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.runtime.executor.data.partitioner;
+package org.apache.nemo.runtime.common.partitioner;
import org.apache.nemo.common.KeyExtractor;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/IntactPartitioner.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/IntactPartitioner.java
similarity index 86%
rename from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/IntactPartitioner.java
rename to runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/IntactPartitioner.java
index b6253d2..dc8d7e6 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/IntactPartitioner.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/IntactPartitioner.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.nemo.runtime.executor.data.partitioner;
+package org.apache.nemo.runtime.common.partitioner;
/**
* An implementation of {@link Partitioner} which makes an output data
- * from a source task to a single {@link org.apache.nemo.runtime.executor.data.partition.Partition}.
+ * from a source task to a single partition.
*/
public final class IntactPartitioner implements Partitioner<Integer> {
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/Partitioner.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/Partitioner.java
new file mode 100644
index 0000000..eb99d7a
--- /dev/null
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/Partitioner.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.partitioner;
+
+import org.apache.nemo.common.KeyExtractor;
+import org.apache.nemo.common.exception.UnsupportedPartitionerException;
+import org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.runtime.common.plan.RuntimeEdge;
+import org.apache.nemo.runtime.common.plan.StageEdge;
+
+import java.io.Serializable;
+
+/**
+ * This interface represents the way of partitioning output data from a source task.
+ * It takes an element and designates key of partition to write the element,
+ * according to the number of destination tasks, the key of each element, etc.
+ *
+ * @param <K> the key type of the partition to write.
+ */
+public interface Partitioner<K extends Serializable> {
+
+ /**
+ * Divides the output data from a task into multiple blocks.
+ *
+ * @param element the output element from a source task.
+ * @return the key of the partition in the block to write the element.
+ */
+ K partition(Object element);
+
+ /**
+ * Gets appropriate partitioner for an edge.
+ *
+ * @param runtimeEdge the runtime edge.
+ * @return the partitioner for the edge.
+ */
+ static Partitioner getPartitioner(final RuntimeEdge runtimeEdge) {
+ final StageEdge stageEdge = (StageEdge) runtimeEdge;
+ final PartitionerProperty.Value partitionerPropertyValue =
+ (PartitionerProperty.Value) runtimeEdge.getPropertyValueOrRuntimeException(PartitionerProperty.class);
+ final int dstParallelism =
+ stageEdge.getDstIRVertex().getPropertyValue(ParallelismProperty.class)
+ .orElseThrow(() -> new RuntimeException("No parallelism in edge " + runtimeEdge.getId()));
+
+ final Partitioner partitioner;
+ switch (partitionerPropertyValue) {
+ case IntactPartitioner:
+ partitioner = new IntactPartitioner();
+ break;
+ case HashPartitioner:
+ final KeyExtractor hashKeyExtractor =
+ (KeyExtractor) runtimeEdge.getPropertyValueOrRuntimeException(KeyExtractorProperty.class);
+ partitioner = new HashPartitioner(dstParallelism, hashKeyExtractor);
+ break;
+ case DataSkewHashPartitioner:
+ final KeyExtractor dataSkewKeyExtractor =
+ (KeyExtractor) runtimeEdge.getPropertyValueOrRuntimeException(KeyExtractorProperty.class);
+ partitioner = new DataSkewHashPartitioner(dstParallelism, dataSkewKeyExtractor);
+ break;
+ case DedicatedKeyPerElementPartitioner:
+ partitioner = new DedicatedKeyPerElementPartitioner();
+ break;
+ default:
+ throw new UnsupportedPartitionerException(
+ new Throwable("Partitioner " + partitionerPropertyValue + " is not supported."));
+ }
+ return partitioner;
+ }
+}
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index bbaf578..39f4801 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -25,7 +25,6 @@
import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;
import org.apache.nemo.common.ir.vertex.*;
-import org.apache.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
import org.apache.nemo.conf.JobConf;
@@ -63,7 +62,6 @@
@Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
this.dagDirectory = dagDirectory;
this.stagePartitioner = stagePartitioner;
- stagePartitioner.addIgnoredPropertyKey(DynamicOptimizationProperty.class);
}
/**
diff --git a/runtime/common/src/test/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java b/runtime/common/src/test/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
index 49327cd..605c5c3 100644
--- a/runtime/common/src/test/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
+++ b/runtime/common/src/test/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
@@ -19,7 +19,10 @@
package org.apache.nemo.runtime.common.optimizer.pass.runtime;
import org.apache.nemo.common.HashRange;
+import org.apache.nemo.common.KeyExtractor;
import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.runtime.common.partitioner.HashPartitioner;
+import org.apache.nemo.runtime.common.partitioner.Partitioner;
import org.junit.Before;
import org.junit.Test;
@@ -48,9 +51,11 @@
@Test
public void testDataSkewDynamicOptimizationPass() {
final Integer taskNum = 5;
+ final KeyExtractor asIsExtractor = new AsIsKeyExtractor();
+ final Partitioner partitioner = new HashPartitioner(taskNum, asIsExtractor);
final List<KeyRange> keyRanges =
- new DataSkewRuntimePass().setNumSkewedKeys(2).calculateKeyRanges(testMetricData, taskNum);
+ new DataSkewRuntimePass(2).calculateKeyRanges(testMetricData, taskNum, partitioner);
// Test whether it correctly redistributed hash ranges.
assertEquals(0, keyRanges.get(0).rangeBeginInclusive());
@@ -63,7 +68,7 @@
assertEquals(5, keyRanges.get(3).rangeEndExclusive());
assertEquals(5, keyRanges.get(4).rangeBeginInclusive());
assertEquals(5, keyRanges.get(4).rangeEndExclusive());
-
+
// Test whether it caught the provided skewness.
assertEquals(false, ((HashRange)keyRanges.get(0)).isSkewed());
assertEquals(false, ((HashRange)keyRanges.get(1)).isSkewed());
@@ -89,4 +94,18 @@
key++;
}
}
+
+ /**
+ * Custom {@link KeyExtractor} which returns the element as is.
+ */
+ private final class AsIsKeyExtractor implements KeyExtractor {
+
+ /**
+ * @see KeyExtractor#extractKey(Object).
+ */
+ @Override
+ public Object extractKey(final Object element) {
+ return element;
+ }
+ }
}
diff --git a/runtime/common/src/test/java/org/apache/nemo/runtime/common/plan/StagePartitionerTest.java b/runtime/common/src/test/java/org/apache/nemo/runtime/common/plan/StagePartitionerTest.java
index cdfafe3..3c0cadb 100644
--- a/runtime/common/src/test/java/org/apache/nemo/runtime/common/plan/StagePartitionerTest.java
+++ b/runtime/common/src/test/java/org/apache/nemo/runtime/common/plan/StagePartitionerTest.java
@@ -49,7 +49,7 @@
@Before
public void setup() throws InjectionException {
stagePartitioner = Tang.Factory.getTang().newInjector().getInstance(StagePartitioner.class);
- stagePartitioner.addIgnoredPropertyKey(DynamicOptimizationProperty.class);
+ stagePartitioner.addIgnoredPropertyKey(IgnoreSchedulingTempDataReceiverProperty.class);
}
/**
@@ -151,7 +151,7 @@
public void testNotSplitByIgnoredProperty() {
final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
final IRVertex v0 = newVertex(1, 0,
- Arrays.asList(DynamicOptimizationProperty.of(DynamicOptimizationProperty.Value.DataSkewRuntimePass)));
+ Arrays.asList(IgnoreSchedulingTempDataReceiverProperty.of()));
final IRVertex v1 = newVertex(1, 0, Collections.emptyList());
dagBuilder.addVertex(v0);
dagBuilder.addVertex(v1);
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/Partitioner.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/Partitioner.java
deleted file mode 100644
index bdaf05a..0000000
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/Partitioner.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.runtime.executor.data.partitioner;
-
-import java.io.Serializable;
-
-/**
- * This interface represents the way of partitioning output data from a source task.
- * It takes an element and designates key of {@link org.apache.nemo.runtime.executor.data.partition.Partition}
- * to write the element, according to the number of destination tasks, the key of each element, etc.
- * @param <K> the key type of the partition to write.
- */
-public interface Partitioner<K extends Serializable> {
-
- /**
- * Divides the output data from a task into multiple blocks.
- *
- * @param element the output element from a source task.
- * @return the key of the partition in the block to write the element.
- */
- K partition(Object element);
-}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
index 4b85087..9590911 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
@@ -26,7 +26,7 @@
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
import org.apache.nemo.runtime.executor.data.block.Block;
-import org.apache.nemo.runtime.executor.data.partitioner.*;
+import org.apache.nemo.runtime.common.partitioner.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,21 +53,19 @@
/**
* Constructor.
*
- * @param hashRangeMultiplier the {@link org.apache.nemo.conf.JobConf.HashRangeMultiplier}.
* @param srcTaskId the id of the source task.
* @param dstIrVertex the destination IR vertex.
* @param runtimeEdge the {@link RuntimeEdge}.
* @param blockManagerWorker the {@link BlockManagerWorker}.
*/
- BlockOutputWriter(final int hashRangeMultiplier,
- final String srcTaskId,
+ BlockOutputWriter(final String srcTaskId,
final IRVertex dstIrVertex,
final RuntimeEdge<?> runtimeEdge,
final BlockManagerWorker blockManagerWorker) {
this.runtimeEdge = runtimeEdge;
this.dstIrVertex = dstIrVertex;
- this.partitioner = OutputWriter.getPartitioner(runtimeEdge, hashRangeMultiplier);
+ this.partitioner = Partitioner.getPartitioner(runtimeEdge);
this.blockManagerWorker = blockManagerWorker;
this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class)
.orElseThrow(() -> new RuntimeException("No data store property on the edge"));
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
index bbe4c72..8a5934b 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
@@ -61,6 +61,7 @@
aggregatedDynOptData.forEach((key, size) ->
partitionSizeEntries.add(
ControlMessage.PartitionSizeEntry.newBuilder()
+ // TODO #325: Add (de)serialization for non-string key types in data metric collection
.setKey(key == null ? NULL_KEY : String.valueOf(key))
.setSize(size)
.build())
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/IntermediateDataIOFactory.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/IntermediateDataIOFactory.java
index a90cc79..e3c4a0c 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/IntermediateDataIOFactory.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/IntermediateDataIOFactory.java
@@ -19,13 +19,11 @@
package org.apache.nemo.runtime.executor.datatransfer;
import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import org.apache.nemo.conf.JobConf;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.common.plan.StageEdge;
import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
-import org.apache.reef.tang.annotations.Parameter;
import javax.inject.Inject;
import java.util.Optional;
@@ -36,13 +34,10 @@
public final class IntermediateDataIOFactory {
private final PipeManagerWorker pipeManagerWorker;
private final BlockManagerWorker blockManagerWorker;
- private final int hashRangeMultiplier;
@Inject
- private IntermediateDataIOFactory(@Parameter(JobConf.HashRangeMultiplier.class) final int hashRangeMultiplier,
- final BlockManagerWorker blockManagerWorker,
+ private IntermediateDataIOFactory(final BlockManagerWorker blockManagerWorker,
final PipeManagerWorker pipeManagerWorker) {
- this.hashRangeMultiplier = hashRangeMultiplier;
this.blockManagerWorker = blockManagerWorker;
this.pipeManagerWorker = pipeManagerWorker;
}
@@ -57,11 +52,10 @@
public OutputWriter createWriter(final String srcTaskId,
final RuntimeEdge<?> runtimeEdge) {
if (isPipe(runtimeEdge)) {
- return new PipeOutputWriter(hashRangeMultiplier, srcTaskId, runtimeEdge, pipeManagerWorker);
+ return new PipeOutputWriter(srcTaskId, runtimeEdge, pipeManagerWorker);
} else {
final StageEdge stageEdge = (StageEdge) runtimeEdge;
- return new BlockOutputWriter(
- hashRangeMultiplier, srcTaskId, stageEdge.getDstIRVertex(), runtimeEdge, blockManagerWorker);
+ return new BlockOutputWriter(srcTaskId, stageEdge.getDstIRVertex(), runtimeEdge, blockManagerWorker);
}
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
index 301c95a..c976cf4 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -18,15 +18,7 @@
*/
package org.apache.nemo.runtime.executor.datatransfer;
-import org.apache.nemo.common.KeyExtractor;
-import org.apache.nemo.common.exception.UnsupportedPartitionerException;
-import org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
-import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.common.punctuation.Watermark;
-import org.apache.nemo.runtime.common.plan.RuntimeEdge;
-import org.apache.nemo.runtime.common.plan.StageEdge;
-import org.apache.nemo.runtime.executor.data.partitioner.*;
import java.util.*;
@@ -53,38 +45,4 @@
Optional<Long> getWrittenBytes();
void close();
-
-
- static Partitioner getPartitioner(final RuntimeEdge runtimeEdge,
- final int hashRangeMultiplier) {
- final StageEdge stageEdge = (StageEdge) runtimeEdge;
- final PartitionerProperty.Value partitionerPropertyValue =
- (PartitionerProperty.Value) runtimeEdge.getPropertyValueOrRuntimeException(PartitionerProperty.class);
- final int dstParallelism =
- stageEdge.getDstIRVertex().getPropertyValue(ParallelismProperty.class).get();
-
- final Partitioner partitioner;
- switch (partitionerPropertyValue) {
- case IntactPartitioner:
- partitioner = new IntactPartitioner();
- break;
- case HashPartitioner:
- final KeyExtractor hashKeyExtractor =
- (KeyExtractor) runtimeEdge.getPropertyValueOrRuntimeException(KeyExtractorProperty.class);
- partitioner = new HashPartitioner(dstParallelism, hashKeyExtractor);
- break;
- case DataSkewHashPartitioner:
- final KeyExtractor dataSkewKeyExtractor =
- (KeyExtractor) runtimeEdge.getPropertyValueOrRuntimeException(KeyExtractorProperty.class);
- partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, dataSkewKeyExtractor);
- break;
- case DedicatedKeyPerElementPartitioner:
- partitioner = new DedicatedKeyPerElementPartitioner();
- break;
- default:
- throw new UnsupportedPartitionerException(
- new Throwable("Partitioner " + partitionerPropertyValue + " is not supported."));
- }
- return partitioner;
- }
}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
index f937975..c8e0dbe 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
@@ -24,7 +24,7 @@
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
-import org.apache.nemo.runtime.executor.data.partitioner.Partitioner;
+import org.apache.nemo.runtime.common.partitioner.Partitioner;
import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,20 +54,18 @@
/**
* Constructor.
*
- * @param hashRangeMultiplier the {@link org.apache.nemo.conf.JobConf.HashRangeMultiplier}.
* @param srcTaskId the id of the source task.
* @param runtimeEdge the {@link RuntimeEdge}.
* @param pipeManagerWorker the pipe manager.
*/
- PipeOutputWriter(final int hashRangeMultiplier,
- final String srcTaskId,
+ PipeOutputWriter(final String srcTaskId,
final RuntimeEdge runtimeEdge,
final PipeManagerWorker pipeManagerWorker) {
this.initialized = false;
this.srcTaskId = srcTaskId;
this.pipeManagerWorker = pipeManagerWorker;
this.pipeManagerWorker.notifyMaster(runtimeEdge.getId(), RuntimeIdManager.getIndexFromTaskId(srcTaskId));
- this.partitioner = OutputWriter.getPartitioner(runtimeEdge, hashRangeMultiplier);
+ this.partitioner = Partitioner.getPartitioner(runtimeEdge);
this.runtimeEdge = runtimeEdge;
this.srcTaskIndex = RuntimeIdManager.getIndexFromTaskId(srcTaskId);
}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
index b80a2ce..bb76bc5 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -449,7 +449,7 @@
}
// Get outgoing edges of that stage with MetricCollectionProperty
- List<StageEdge> stageEdges = planStateManager.getPhysicalPlan().getStageDAG()
+ final List<StageEdge> stageEdges = planStateManager.getPhysicalPlan().getStageDAG()
.getOutgoingEdgesOf(parentStages.get(0));
for (StageEdge edge : stageEdges) {
if (edge.getExecutionProperties().containsKey(MetricCollectionProperty.class)) {