blob: 8650aeaf9fbaad615de121cf9bf663b051134913 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.junit.Assert;
import org.junit.Test;
import com.google.common.collect.Maps;
@SuppressWarnings("deprecation")
public class TestMockDAGAppMaster {
static Configuration defaultConf;
static FileSystem localFs;
static Path workDir;
static {
try {
defaultConf = new Configuration(false);
defaultConf.set("fs.defaultFS", "file:///");
defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
localFs = FileSystem.getLocal(defaultConf);
workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")),
"TestDAGAppMaster").makeQualified(localFs);
} catch (IOException e) {
throw new RuntimeException("init failure", e);
}
}
@Test (timeout = 5000)
public void testLocalResourceSetup() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
tezClient.start();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
Map<String, LocalResource> lrDAG = Maps.newHashMap();
String lrName1 = "LR1";
lrDAG.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
Map<String, LocalResource> lrVertex = Maps.newHashMap();
String lrName2 = "LR2";
lrVertex.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
DAG dag = DAG.create("test").addTaskLocalFiles(lrDAG);
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5).addTaskLocalFiles(lrVertex);
dag.addVertex(vA);
DAGClient dagClient = tezClient.submitDAG(dag);
mockLauncher.waitTillContainersLaunched();
ContainerData cData = mockLauncher.getContainers().values().iterator().next();
ContainerLaunchContext launchContext = cData.launchContext;
Map<String, LocalResource> taskLR = launchContext.getLocalResources();
// verify tasks are launched with both DAG and task resources.
Assert.assertTrue(taskLR.containsKey(lrName1));
Assert.assertTrue(taskLR.containsKey(lrName2));
mockLauncher.startScheduling(true);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
tezClient.stop();
}
@Test (timeout = 10000)
public void testMultipleSubmissions() throws Exception {
Map<String, LocalResource> lrDAG = Maps.newHashMap();
String lrName1 = "LR1";
lrDAG.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
Map<String, LocalResource> lrVertex = Maps.newHashMap();
String lrName2 = "LR2";
lrVertex.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
DAG dag = DAG.create("test").addTaskLocalFiles(lrDAG);
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5).addTaskLocalFiles(lrVertex);
dag.addVertex(vA);
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
tezClient.start();
DAGClient dagClient = tezClient.submitDAG(dag);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
tezClient.stop();
// submit the same DAG again to verify it can be done.
tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
tezClient.start();
dagClient = tezClient.submitDAG(dag);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
tezClient.stop();
}
@Test (timeout = 10000)
public void testSchedulerErrorHandling() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
tezClient.start();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
DAG dag = DAG.create("test");
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
dag.addVertex(vA);
tezClient.submitDAG(dag);
mockLauncher.waitTillContainersLaunched();
mockApp.handle(new DAGAppMasterEventSchedulingServiceError(new RuntimeException("Mock error")));
while(!mockApp.getShutdownHandler().wasShutdownInvoked()) {
Thread.sleep(100);
}
Assert.assertEquals(DAGState.RUNNING, mockApp.getContext().getCurrentDAG().getState());
}
}