blob: 343018045ccb3f02c4d1caf67df5dd4488b15d2c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.common.plan;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.nemo.common.HashRange;
import org.apache.nemo.common.KeyRange;
import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
import org.apache.nemo.common.ir.edge.executionproperty.PartitionSetProperty;
import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* Edge of a stage that connects an IRVertex of the source stage to an IRVertex of the destination stage.
* This means that there can be multiple StageEdges between two Stages.
*/
public final class StageEdge extends RuntimeEdge<Stage> {
private static final Logger LOG = LoggerFactory.getLogger(StageEdge.class.getName());
/**
* The source {@link IRVertex}.
* This belongs to the srcStage.
*/
private final IRVertex srcVertex;
/**
* The destination {@link IRVertex}.
* This belongs to the dstStage.
*/
private final IRVertex dstVertex;
/**
* Constructor.
*
* @param runtimeEdgeId id of the runtime edge.
* @param edgeProperties edge execution properties.
* @param srcVertex source IRVertex in the srcStage of this edge.
* @param dstVertex destination IRVertex in the dstStage of this edge.
* @param srcStage source stage.
* @param dstStage destination stage.
*/
@VisibleForTesting
public StageEdge(final String runtimeEdgeId,
final ExecutionPropertyMap<EdgeExecutionProperty> edgeProperties,
final IRVertex srcVertex,
final IRVertex dstVertex,
final Stage srcStage,
final Stage dstStage) {
super(runtimeEdgeId, edgeProperties, srcStage, dstStage);
this.srcVertex = srcVertex;
this.dstVertex = dstVertex;
}
/**
* @return the source IR vertex of the edge.
*/
public IRVertex getSrcIRVertex() {
return srcVertex;
}
/**
* @return the destination IR vertex of the edge.
*/
public IRVertex getDstIRVertex() {
return dstVertex;
}
@Override
public ObjectNode getPropertiesAsJsonNode() {
final ObjectNode node = JsonNodeFactory.instance.objectNode();
node.put("runtimeEdgeId", getId());
node.set("executionProperties", getExecutionProperties().asJsonNode());
node.put("externalSrcVertexId", srcVertex.getId());
node.put("externalDstVertexId", dstVertex.getId());
return node;
}
@Override
public String toString() {
return getPropertiesAsJsonNode().toString();
}
/**
* @param edge edge to compare.
* @return whether or not the edge has the same itinerary
*/
public Boolean hasSameItineraryAs(final StageEdge edge) {
return getSrc().equals(edge.getSrc()) && getDst().equals(edge.getDst());
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final StageEdge stageEdge = (StageEdge) o;
return getExecutionProperties().equals(stageEdge.getExecutionProperties()) && hasSameItineraryAs(stageEdge);
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(getSrc().hashCode())
.append(getDst().hashCode())
.append(getExecutionProperties())
.toHashCode();
}
/**
* @return {@link CommunicationPatternProperty} value.
*/
public CommunicationPatternProperty.Value getDataCommunicationPattern() {
return getExecutionProperties().get(CommunicationPatternProperty.class)
.orElseThrow(() -> new RuntimeException(String.format(
"CommunicationPatternProperty not set for %s", getId())));
}
/**
* @return {@link DataFlowProperty} value.
*/
public DataFlowProperty.Value getDataFlowModel() {
return getExecutionProperties().get(DataFlowProperty.class)
.orElseThrow(() -> new RuntimeException(String.format(
"DataFlowProperty not set for %s", getId())));
}
/**
* @return {@link org.apache.nemo.common.ir.edge.executionproperty.PartitionSetProperty} value.
*/
public List<KeyRange> getKeyRanges() {
final ArrayList<KeyRange> defaultPartitionSet = new ArrayList<>();
for (int taskIndex = 0; taskIndex < getDst().getParallelism(); taskIndex++) {
defaultPartitionSet.add(taskIndex, HashRange.of(taskIndex, taskIndex + 1));
}
final List<KeyRange> keyRanges = getExecutionProperties()
.get(PartitionSetProperty.class).orElse(defaultPartitionSet);
return keyRanges;
}
}