blob: f92647146708099366fd9b1a98d1f13bf9ff1405 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.history.utils;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.runtime.api.OutputCommitter;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import com.google.common.collect.Sets;
public class TestDAGUtils {
private DAGPlan createDAG() {
// Create a plan with 3 vertices: A, B, C. Group(A,B)->C
Configuration conf = new Configuration(false);
int dummyTaskCount = 1;
Resource dummyTaskResource = Resource.newInstance(1, 1);
org.apache.tez.dag.api.Vertex v1 = new org.apache.tez.dag.api.Vertex("vertex1",
new ProcessorDescriptor("Processor").setHistoryText("vertex1 Processor HistoryText"),
dummyTaskCount, dummyTaskResource);
v1.addInput("input1", new InputDescriptor("input.class").setHistoryText("input HistoryText"),
null);
org.apache.tez.dag.api.Vertex v2 = new org.apache.tez.dag.api.Vertex("vertex2",
new ProcessorDescriptor("Processor").setHistoryText("vertex2 Processor HistoryText"),
dummyTaskCount, dummyTaskResource);
org.apache.tez.dag.api.Vertex v3 = new org.apache.tez.dag.api.Vertex("vertex3",
new ProcessorDescriptor("Processor").setHistoryText("vertex3 Processor HistoryText"),
dummyTaskCount, dummyTaskResource);
DAG dag = new DAG("testDag");
String groupName1 = "uv12";
org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
OutputDescriptor outDesc = new OutputDescriptor("output.class")
.setHistoryText("uvOut HistoryText");
uv12.addOutput("uvOut", outDesc, OutputCommitter.class);
v3.addOutput("uvOut", outDesc, OutputCommitter.class);
GroupInputEdge e1 = new GroupInputEdge(uv12, v3,
new EdgeProperty(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
new OutputDescriptor("dummy output class").setHistoryText("Dummy History Text"),
new InputDescriptor("dummy input class").setHistoryText("Dummy History Text")),
new InputDescriptor("merge.class").setHistoryText("Merge HistoryText"));
dag.addVertex(v1);
dag.addVertex(v2);
dag.addVertex(v3);
dag.addEdge(e1);
return dag.createDag(conf);
}
@Test
public void testConvertDAGPlanToATSMap() throws IOException, JSONException {
DAGPlan dagPlan = createDAG();
Map<String, Object> atsMap = DAGUtils.convertDAGPlanToATSMap(dagPlan);
Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_NAME_KEY));
Assert.assertEquals(dagPlan.getName(), atsMap.get(DAGUtils.DAG_NAME_KEY));
Assert.assertTrue(atsMap.containsKey("version"));
Assert.assertEquals(1, atsMap.get("version"));
Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTICES_KEY));
Assert.assertTrue(atsMap.containsKey(DAGUtils.EDGES_KEY));
Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTEX_GROUPS_KEY));
Assert.assertEquals(3, ((Collection<?>) atsMap.get(DAGUtils.VERTICES_KEY)).size());
Set<String> vNames = Sets.newHashSet("vertex1", "vertex2", "vertex3");
Set<String> inEdgeIds = new HashSet<String>();
Set<String> outEdgeIds = new HashSet<String>();
int additionalInputCount = 0;
int additionalOutputCount = 0;
for (Object o : ((Collection<?>) atsMap.get(DAGUtils.VERTICES_KEY))) {
Map<String, Object> v = (Map<String, Object>) o;
Assert.assertTrue(v.containsKey(DAGUtils.VERTEX_NAME_KEY));
Assert.assertTrue(v.containsKey(DAGUtils.PROCESSOR_CLASS_KEY));
Assert.assertTrue(v.containsKey(DAGUtils.USER_PAYLOAD_AS_TEXT));
if (v.containsKey(DAGUtils.IN_EDGE_IDS_KEY)) {
inEdgeIds.addAll(((Collection<String>) v.get(DAGUtils.IN_EDGE_IDS_KEY)));
}
if (v.containsKey(DAGUtils.OUT_EDGE_IDS_KEY)) {
outEdgeIds.addAll(((Collection<String>) v.get(DAGUtils.OUT_EDGE_IDS_KEY)));
}
String vName = (String) v.get(DAGUtils.VERTEX_NAME_KEY);
Assert.assertTrue(vNames.contains(vName));
String procPayload = vName + " Processor HistoryText";
Assert.assertEquals(procPayload, v.get(DAGUtils.USER_PAYLOAD_AS_TEXT));
if (v.containsKey(DAGUtils.ADDITIONAL_INPUTS_KEY)) {
additionalInputCount += ((Collection<?>) v.get(DAGUtils.ADDITIONAL_INPUTS_KEY)).size();
for (Object input : ((Collection<?>) v.get(DAGUtils.ADDITIONAL_INPUTS_KEY))) {
Map<String, Object> inputMap = (Map<String, Object>) input;
Assert.assertTrue(inputMap.containsKey(DAGUtils.NAME_KEY));
Assert.assertTrue(inputMap.containsKey(DAGUtils.CLASS_KEY));
Assert.assertFalse(inputMap.containsKey(DAGUtils.INITIALIZER_KEY));
Assert.assertEquals("input HistoryText", inputMap.get(DAGUtils.USER_PAYLOAD_AS_TEXT));
}
}
if (v.containsKey(DAGUtils.ADDITIONAL_OUTPUTS_KEY)) {
additionalOutputCount += ((Collection<?>) v.get(DAGUtils.ADDITIONAL_OUTPUTS_KEY)).size();
for (Object output : ((Collection<?>) v.get(DAGUtils.ADDITIONAL_OUTPUTS_KEY))) {
Map<String, Object> outputMap = (Map<String, Object>) output;
Assert.assertTrue(outputMap.containsKey(DAGUtils.NAME_KEY));
Assert.assertTrue(outputMap.containsKey(DAGUtils.CLASS_KEY));
Assert.assertTrue(outputMap.containsKey(DAGUtils.INITIALIZER_KEY));
Assert.assertEquals("uvOut HistoryText", outputMap.get(DAGUtils.USER_PAYLOAD_AS_TEXT));
}
}
}
// 1 input
Assert.assertEquals(1, additionalInputCount);
// 3 outputs due to vertex group
Assert.assertEquals(3, additionalOutputCount);
// 1 edge translates to 2 due to vertex group
Assert.assertEquals(2, inEdgeIds.size());
Assert.assertEquals(2, outEdgeIds.size());
for (Object o : ((Collection<?>) atsMap.get(DAGUtils.EDGES_KEY))) {
Map<String, Object> e = (Map<String, Object>) o;
Assert.assertTrue(inEdgeIds.contains(e.get(DAGUtils.EDGE_ID_KEY)));
Assert.assertTrue(outEdgeIds.contains(e.get(DAGUtils.EDGE_ID_KEY)));
Assert.assertTrue(e.containsKey(DAGUtils.INPUT_VERTEX_NAME_KEY));
Assert.assertTrue(e.containsKey(DAGUtils.OUTPUT_VERTEX_NAME_KEY));
Assert.assertEquals(DataMovementType.SCATTER_GATHER.name(),
e.get(DAGUtils.DATA_MOVEMENT_TYPE_KEY));
Assert.assertEquals(DataSourceType.PERSISTED.name(), e.get(DAGUtils.DATA_SOURCE_TYPE_KEY));
Assert.assertEquals(SchedulingType.SEQUENTIAL.name(), e.get(DAGUtils.SCHEDULING_TYPE_KEY));
Assert.assertEquals("dummy output class", e.get(DAGUtils.EDGE_SOURCE_CLASS_KEY));
Assert.assertEquals("dummy input class", e.get(DAGUtils.EDGE_DESTINATION_CLASS_KEY));
Assert.assertEquals("Dummy History Text", e.get(DAGUtils.OUTPUT_USER_PAYLOAD_AS_TEXT));
Assert.assertEquals("Dummy History Text", e.get(DAGUtils.INPUT_USER_PAYLOAD_AS_TEXT));
}
for (Object o : ((Collection<?>) atsMap.get(DAGUtils.VERTEX_GROUPS_KEY))) {
Map<String, Object> e = (Map<String, Object>) o;
Assert.assertEquals("uv12", e.get(DAGUtils.VERTEX_GROUP_NAME_KEY));
Assert.assertTrue(e.containsKey(DAGUtils.VERTEX_GROUP_MEMBERS_KEY));
Assert.assertTrue(e.containsKey(DAGUtils.VERTEX_GROUP_OUTPUTS_KEY));
Assert.assertTrue(e.containsKey(DAGUtils.VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY));
}
}
}