blob: 9614df8bb5300a67b4844a8d9ce6198749ebd6d6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.core.profiling;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.wayang.commons.util.profiledb.model.Measurement;
import org.apache.wayang.commons.util.profiledb.model.Type;
import org.apache.wayang.core.plan.executionplan.Channel;
import org.apache.wayang.core.plan.executionplan.ExecutionPlan;
import org.apache.wayang.core.plan.executionplan.ExecutionTask;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
/**
* This {@link Measurement} encapsulates an {@link ExecutionPlan}.
*/
@Type("execution-plan")
public class ExecutionPlanMeasurement extends Measurement {
/**
* Stores the {@link Channel}s of an {@link ExecutionPlan} as {@link ChannelNode}s.
*/
private List<ChannelNode> channels;
/**
* Stores the {@link ExecutionOperator}s of an {@link ExecutionPlan} as {@link OperatorNode}s.
*/
private List<OperatorNode> operators;
/**
* Stores the connections between {@link Channel}s and {@link ExecutionOperator}s of an {@link ExecutionPlan} as {@link Link}s.
*/
private List<Link> links;
/**
* Deserialization constructor.
*/
private ExecutionPlanMeasurement() {
}
private ExecutionPlanMeasurement(String id) {
super(id);
}
/**
* Creates a new instance.
*
* @param executionPlan that should be reflected in the new instance
* @param id ID for the new instance
* @return the new instance
*/
public static ExecutionPlanMeasurement capture(ExecutionPlan executionPlan, String id) {
ExecutionPlanMeasurement instance = new ExecutionPlanMeasurement(id);
// Collect the tasks of the plan.
Set<ExecutionTask> executionTasks = executionPlan.collectAllTasks();
// Initialize the new instance.
instance.operators = new ArrayList<>(executionTasks.size());
instance.channels = new ArrayList<>(executionTasks.size());
instance.links = new ArrayList<>(executionTasks.size());
// Keep track of already created ChannelNodes.
Map<Channel, ChannelNode> channelNodeMap = new HashMap<>(executionTasks.size());
// Go over the ExecutionTasks and create all ChannelNodes, OperatorNodes, and Links immediately.
int nextNodeId = 0;
for (ExecutionTask executionTask : executionTasks) {
// Create the OperatorNode.
ExecutionOperator operator = executionTask.getOperator();
OperatorNode operatorNode = new OperatorNode(
nextNodeId++,
operator.getClass().getCanonicalName(),
operator.getName(),
operator.getPlatform().getName()
);
instance.operators.add(operatorNode);
// Create inbound ChannelNodes and Links.
for (Channel inputChannel : executionTask.getInputChannels()) {
if (inputChannel == null) continue;
ChannelNode channelNode = channelNodeMap.get(inputChannel);
if (channelNode == null) {
channelNode = new ChannelNode(
nextNodeId++,
inputChannel.getClass().getCanonicalName(),
inputChannel.getDataSetType().getDataUnitType().getTypeClass().getName()
);
channelNodeMap.put(inputChannel, channelNode);
instance.channels.add(channelNode);
}
instance.links.add(new Link(channelNode.getId(), operatorNode.getId()));
}
// Create outbound ChannelNodes and Links.
for (Channel outputChannel : executionTask.getOutputChannels()) {
if (outputChannel == null) continue;
ChannelNode channelNode = channelNodeMap.get(outputChannel);
if (channelNode == null) {
channelNode = new ChannelNode(
nextNodeId++,
outputChannel.getClass().getCanonicalName(),
outputChannel.getDataSetType().getDataUnitType().getTypeClass().getName()
);
channelNodeMap.put(outputChannel, channelNode);
instance.channels.add(channelNode);
}
instance.links.add(new Link(operatorNode.getId(), channelNode.getId()));
}
}
return instance;
}
public List<ChannelNode> getChannels() {
return channels;
}
public List<OperatorNode> getOperators() {
return operators;
}
public List<Link> getLinks() {
return links;
}
/**
* Encapsulates a {@link Channel} of the {@link ExecutionPlan}.
*/
public static class ChannelNode {
/**
* ID of this instance to be used in {@link Link}s.
*/
private int id;
/**
* The type of the {@link Channel}.
*/
private String type;
/**
* The type of data quanta in the {@link Channel}.
*/
private String dataQuantaType;
/**
* Deserialization constructor.
*/
private ChannelNode() {
}
public ChannelNode(int id, String type, String dataQuantaType) {
this.id = id;
this.type = type;
this.dataQuantaType = dataQuantaType;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getDataQuantaType() {
return dataQuantaType;
}
public void setDataQuantaType(String dataQuantaType) {
this.dataQuantaType = dataQuantaType;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
}
/**
* Encapsulates a {@link Channel} of the {@link ExecutionPlan}.
*/
public static class OperatorNode {
/**
* ID of this instance to be used in {@link Link}s.
*/
private int id;
/**
* The type of the {@link ExecutionOperator}.
*/
private String type;
/**
* The name of the {@link ExecutionOperator}.
*/
private String name;
/**
* The name of the {@link org.apache.wayang.core.platform.Platform} of the {@link ExecutionOperator}.
*/
private String platform;
/**
* Deserialization constructor.
*/
private OperatorNode() {
}
public OperatorNode(int id, String type, String name, String platform) {
this.id = id;
this.type = type;
this.name = name;
this.platform = platform;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getPlatform() {
return platform;
}
public void setPlatform(String platform) {
this.platform = platform;
}
}
/**
* A directed link between an {@link OperatorNode} and a {@link ChannelNode} (in any order).
*/
public static class Link {
private int source, destination;
/**
* Deserialization constructor.
*/
private Link() {
}
public Link(int source, int destination) {
this.source = source;
this.destination = destination;
}
public int getSource() {
return source;
}
public void setSource(int source) {
this.source = source;
}
public int getDestination() {
return destination;
}
public void setDestination(int destination) {
this.destination = destination;
}
}
}