blob: 60ebc53229d5a095704c6cdb9c45ced105ba8667 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.util;
import java.util.ArrayList;
import java.util.List;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.GroupInputSpecProto;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.IOSpecProto;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto;
public class ProtoConverters {
public static TaskSpec getTaskSpecfromProto(TaskSpecProto taskSpecProto) {
TezTaskAttemptID taskAttemptID =
TezTaskAttemptID.fromString(taskSpecProto.getTaskAttemptIdString());
ProcessorDescriptor processorDescriptor = null;
if (taskSpecProto.hasProcessorDescriptor()) {
processorDescriptor = DagTypeConverters
.convertProcessorDescriptorFromDAGPlan(taskSpecProto.getProcessorDescriptor());
}
List<InputSpec> inputSpecList = new ArrayList<InputSpec>(taskSpecProto.getInputSpecsCount());
if (taskSpecProto.getInputSpecsCount() > 0) {
for (IOSpecProto inputSpecProto : taskSpecProto.getInputSpecsList()) {
inputSpecList.add(getInputSpecFromProto(inputSpecProto));
}
}
List<OutputSpec> outputSpecList =
new ArrayList<OutputSpec>(taskSpecProto.getOutputSpecsCount());
if (taskSpecProto.getOutputSpecsCount() > 0) {
for (IOSpecProto outputSpecProto : taskSpecProto.getOutputSpecsList()) {
outputSpecList.add(getOutputSpecFromProto(outputSpecProto));
}
}
List<GroupInputSpec> groupInputSpecs =
new ArrayList<GroupInputSpec>(taskSpecProto.getGroupedInputSpecsCount());
if (taskSpecProto.getGroupedInputSpecsCount() > 0) {
for (GroupInputSpecProto groupInputSpecProto : taskSpecProto.getGroupedInputSpecsList()) {
groupInputSpecs.add(getGroupInputSpecFromProto(groupInputSpecProto));
}
}
TaskSpec taskSpec =
new TaskSpec(taskAttemptID, taskSpecProto.getDagName(), taskSpecProto.getVertexName(),
taskSpecProto.getVertexParallelism(), processorDescriptor, inputSpecList,
outputSpecList, groupInputSpecs);
return taskSpec;
}
public static TaskSpecProto convertTaskSpecToProto(TaskSpec taskSpec) {
TaskSpecProto.Builder builder = TaskSpecProto.newBuilder();
builder.setTaskAttemptIdString(taskSpec.getTaskAttemptID().toString());
builder.setDagName(taskSpec.getDAGName());
builder.setVertexName(taskSpec.getVertexName());
builder.setVertexParallelism(taskSpec.getVertexParallelism());
if (taskSpec.getProcessorDescriptor() != null) {
builder.setProcessorDescriptor(
DagTypeConverters.convertToDAGPlan(taskSpec.getProcessorDescriptor()));
}
if (taskSpec.getInputs() != null && !taskSpec.getInputs().isEmpty()) {
for (InputSpec inputSpec : taskSpec.getInputs()) {
builder.addInputSpecs(convertInputSpecToProto(inputSpec));
}
}
if (taskSpec.getOutputs() != null && !taskSpec.getOutputs().isEmpty()) {
for (OutputSpec outputSpec : taskSpec.getOutputs()) {
builder.addOutputSpecs(convertOutputSpecToProto(outputSpec));
}
}
if (taskSpec.getGroupInputs() != null && !taskSpec.getGroupInputs().isEmpty()) {
for (GroupInputSpec groupInputSpec : taskSpec.getGroupInputs()) {
builder.addGroupedInputSpecs(convertGroupInputSpecToProto(groupInputSpec));
}
}
return builder.build();
}
public static InputSpec getInputSpecFromProto(IOSpecProto inputSpecProto) {
InputDescriptor inputDescriptor = null;
if (inputSpecProto.hasIoDescriptor()) {
inputDescriptor =
DagTypeConverters.convertInputDescriptorFromDAGPlan(inputSpecProto.getIoDescriptor());
}
InputSpec inputSpec = new InputSpec(inputSpecProto.getConnectedVertexName(), inputDescriptor,
inputSpecProto.getPhysicalEdgeCount());
return inputSpec;
}
public static IOSpecProto convertInputSpecToProto(InputSpec inputSpec) {
IOSpecProto.Builder builder = IOSpecProto.newBuilder();
if (inputSpec.getSourceVertexName() != null) {
builder.setConnectedVertexName(inputSpec.getSourceVertexName());
}
if (inputSpec.getInputDescriptor() != null) {
builder.setIoDescriptor(DagTypeConverters.convertToDAGPlan(inputSpec.getInputDescriptor()));
}
builder.setPhysicalEdgeCount(inputSpec.getPhysicalEdgeCount());
return builder.build();
}
public static OutputSpec getOutputSpecFromProto(IOSpecProto outputSpecProto) {
OutputDescriptor outputDescriptor = null;
if (outputSpecProto.hasIoDescriptor()) {
outputDescriptor =
DagTypeConverters.convertOutputDescriptorFromDAGPlan(outputSpecProto.getIoDescriptor());
}
OutputSpec outputSpec =
new OutputSpec(outputSpecProto.getConnectedVertexName(), outputDescriptor,
outputSpecProto.getPhysicalEdgeCount());
return outputSpec;
}
public static IOSpecProto convertOutputSpecToProto(OutputSpec outputSpec) {
IOSpecProto.Builder builder = IOSpecProto.newBuilder();
if (outputSpec.getDestinationVertexName() != null) {
builder.setConnectedVertexName(outputSpec.getDestinationVertexName());
}
if (outputSpec.getOutputDescriptor() != null) {
builder.setIoDescriptor(DagTypeConverters.convertToDAGPlan(outputSpec.getOutputDescriptor()));
}
builder.setPhysicalEdgeCount(outputSpec.getPhysicalEdgeCount());
return builder.build();
}
public static GroupInputSpec getGroupInputSpecFromProto(GroupInputSpecProto groupInputSpecProto) {
GroupInputSpec groupSpec = new GroupInputSpec(groupInputSpecProto.getGroupName(),
groupInputSpecProto.getGroupVerticesList(), DagTypeConverters
.convertInputDescriptorFromDAGPlan(groupInputSpecProto.getMergedInputDescriptor()));
return groupSpec;
}
public static GroupInputSpecProto convertGroupInputSpecToProto(GroupInputSpec groupInputSpec) {
GroupInputSpecProto.Builder builder = GroupInputSpecProto.newBuilder();
builder.setGroupName(groupInputSpec.getGroupName());
builder.addAllGroupVertices(groupInputSpec.getGroupVertices());
builder.setMergedInputDescriptor(
DagTypeConverters.convertToDAGPlan(groupInputSpec.getMergedInputDescriptor()));
return builder.build();
}
}