| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.test; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.tez.client.TezClient; |
| import org.apache.tez.dag.api.DAG; |
| import org.apache.tez.dag.api.ProcessorDescriptor; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.TezConstants; |
| import org.apache.tez.dag.api.TezException; |
| import org.apache.tez.dag.api.Vertex; |
| import org.apache.tez.dag.api.client.DAGClient; |
| import org.apache.tez.dag.api.client.DAGStatus; |
| import org.apache.tez.dag.api.client.VertexStatus; |
| import org.apache.tez.examples.OrderedWordCount; |
| import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; |
| import org.apache.tez.runtime.api.Event; |
| import org.apache.tez.runtime.api.LogicalInput; |
| import org.apache.tez.runtime.api.LogicalOutput; |
| import org.apache.tez.runtime.api.ProcessorContext; |
| import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; |
| import org.apache.tez.runtime.library.processor.SleepProcessor; |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| |
| import static org.junit.Assert.*; |
| |
| /** |
| * Tests for running Tez in local execution mode (without YARN). |
| */ |
| @RunWith(Parameterized.class) |
| public class TestLocalMode { |
| |
| private static final File STAGING_DIR = new File(System.getProperty("test.build.data"), |
| TestLocalMode.class.getName()); |
| |
| private static MiniDFSCluster dfsCluster; |
| private static FileSystem remoteFs; |
| |
| private final boolean useDfs; |
| private final boolean useLocalModeWithoutNetwork; |
| |
| @Parameterized.Parameters(name = "useDFS:{0} useLocalModeWithoutNetwork:{1}") |
| public static Collection<Object[]> params() { |
| return Arrays.asList(new Object[][]{{false, false}, {true, false}, {false, true}, {true, true}}); |
| } |
| |
| public TestLocalMode(boolean useDfs, boolean useLocalModeWithoutNetwork) { |
| this.useDfs = useDfs; |
| this.useLocalModeWithoutNetwork = useLocalModeWithoutNetwork; |
| } |
| |
| @BeforeClass |
| public static void beforeClass() throws Exception { |
| try { |
| Configuration conf = new Configuration(); |
| dfsCluster = |
| new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true) |
| .racks(null).build(); |
| remoteFs = dfsCluster.getFileSystem(); |
| } catch (IOException io) { |
| throw new RuntimeException("problem starting mini dfs cluster", io); |
| } |
| } |
| |
| @AfterClass |
| public static void afterClass() throws InterruptedException { |
| if (dfsCluster != null) { |
| try { |
| dfsCluster.shutdown(); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| |
| private TezConfiguration createConf() { |
| TezConfiguration conf = new TezConfiguration(); |
| conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); |
| conf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE_WITHOUT_NETWORK, useLocalModeWithoutNetwork); |
| |
| if (useDfs) { |
| conf.set("fs.defaultFS", remoteFs.getUri().toString()); |
| } else { |
| conf.set("fs.defaultFS", "file:///"); |
| } |
| conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, STAGING_DIR.getAbsolutePath()); |
| conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); |
| return conf; |
| } |
| |
| @Test(timeout = 30000) |
| public void testMultipleClientsWithSession() throws TezException, InterruptedException, |
| IOException { |
| TezConfiguration tezConf1 = createConf(); |
| TezClient tezClient1 = TezClient.create("commonName", tezConf1, true); |
| tezClient1.start(); |
| |
| DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName()); |
| |
| DAGClient dagClient1 = tezClient1.submitDAG(dag1); |
| dagClient1.waitForCompletion(); |
| assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState()); |
| assertEquals(VertexStatus.State.SUCCEEDED, |
| dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState()); |
| |
| dagClient1.close(); |
| tezClient1.stop(); |
| |
| TezConfiguration tezConf2 = createConf(); |
| DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName()); |
| TezClient tezClient2 = TezClient.create("commonName", tezConf2, true); |
| tezClient2.start(); |
| DAGClient dagClient2 = tezClient2.submitDAG(dag2); |
| dagClient2.waitForCompletion(); |
| assertEquals(DAGStatus.State.SUCCEEDED, dagClient2.getDAGStatus(null).getState()); |
| assertEquals(VertexStatus.State.SUCCEEDED, |
| dagClient2.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState()); |
| assertFalse(dagClient1.getExecutionContext().equals(dagClient2.getExecutionContext())); |
| dagClient2.close(); |
| tezClient2.stop(); |
| } |
| |
| @Test(timeout = 10000) |
| public void testMultipleClientsWithoutSession() throws TezException, InterruptedException, |
| IOException { |
| TezConfiguration tezConf1 = createConf(); |
| TezClient tezClient1 = TezClient.create("commonName", tezConf1, false); |
| tezClient1.start(); |
| |
| DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName()); |
| |
| DAGClient dagClient1 = tezClient1.submitDAG(dag1); |
| dagClient1.waitForCompletion(); |
| assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState()); |
| assertEquals(VertexStatus.State.SUCCEEDED, |
| dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState()); |
| dagClient1.close(); |
| tezClient1.stop(); |
| |
| |
| TezConfiguration tezConf2 = createConf(); |
| DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName()); |
| TezClient tezClient2 = TezClient.create("commonName", tezConf2, false); |
| tezClient2.start(); |
| DAGClient dagClient2 = tezClient2.submitDAG(dag2); |
| dagClient2.waitForCompletion(); |
| assertEquals(DAGStatus.State.SUCCEEDED, dagClient2.getDAGStatus(null).getState()); |
| assertEquals(VertexStatus.State.SUCCEEDED, |
| dagClient2.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState()); |
| assertFalse(dagClient1.getExecutionContext().equals(dagClient2.getExecutionContext())); |
| dagClient2.close(); |
| tezClient2.stop(); |
| } |
| |
| @Test(timeout = 20000) |
| public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException, |
| IOException { |
| TezConfiguration tezConf1 = createConf(); |
| // Run in non-session mode so that the AM terminates |
| TezClient tezClient1 = TezClient.create("commonName", tezConf1, false); |
| tezClient1.start(); |
| |
| DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName()); |
| |
| DAGClient dagClient1 = tezClient1.submitDAG(dag1); |
| dagClient1.waitForCompletion(); |
| assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState()); |
| assertEquals(VertexStatus.State.SUCCEEDED, |
| dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState()); |
| // Sleep for more time than is required for the DAG to complete. |
| Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5)); |
| |
| dagClient1.close(); |
| tezClient1.stop(); |
| } |
| |
| @Test(timeout = 20000) |
| public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException, |
| IOException { |
| TezConfiguration tezConf1 = createConf(); |
| // Run in non-session mode so that the AM terminates |
| TezClient tezClient1 = TezClient.create("commonName", tezConf1, false); |
| tezClient1.start(); |
| |
| DAG dag1 = createSimpleDAG("dag1", FailingProcessor.class.getName()); |
| |
| DAGClient dagClient1 = tezClient1.submitDAG(dag1); |
| dagClient1.waitForCompletion(); |
| assertEquals(DAGStatus.State.FAILED, dagClient1.getDAGStatus(null).getState()); |
| assertEquals(VertexStatus.State.FAILED, |
| dagClient1.getVertexStatus(SleepProcessor.SLEEP_VERTEX_NAME, null).getState()); |
| // Sleep for more time than is required for the DAG to complete. |
| Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5)); |
| |
| dagClient1.close(); |
| tezClient1.stop(); |
| } |
| |
| public static class FailingProcessor extends AbstractLogicalIOProcessor { |
| |
| public FailingProcessor(ProcessorContext context) { |
| super(context); |
| } |
| |
| @Override |
| public void initialize() throws Exception { |
| } |
| |
| @Override |
| public void handleEvents(List<Event> processorEvents) { |
| } |
| |
| @Override |
| public void close() throws Exception { |
| } |
| |
| @Override |
| public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws |
| Exception { |
| throw new TezException("FailingProcessor"); |
| |
| } |
| } |
| |
| private DAG createSimpleDAG(String dagName, String processorName) { |
| DAG dag = DAG.create(dagName).addVertex(Vertex.create(SleepProcessor.SLEEP_VERTEX_NAME, ProcessorDescriptor |
| .create(processorName).setUserPayload(new SleepProcessor.SleepProcessorConfig(1).toUserPayload()), 1)); |
| return dag; |
| } |
| |
| @Test(timeout=30000) |
| public void testMultiDAGsOnSession() throws IOException, TezException, InterruptedException { |
| int dags = 2;//two dags will be submitted to session |
| String[] inputPaths = new String[dags]; |
| String[] outputPaths = new String[dags]; |
| DAGClient[] dagClients = new DAGClient[dags]; |
| |
| TezConfiguration tezConf = createConf(); |
| TezClient tezClient = TezClient.create("testMultiDAGOnSession", tezConf, true); |
| tezClient.start(); |
| |
| //create inputs and outputs |
| FileSystem fs = FileSystem.get(tezConf); |
| for(int i = 0; i < dags; i++) { |
| inputPaths[i] = new Path(STAGING_DIR.getAbsolutePath(), "in-" + i).toString(); |
| createInputFile(fs, inputPaths[i]); |
| outputPaths[i] = new Path(STAGING_DIR.getAbsolutePath(), "out-" + i).toString(); |
| } |
| |
| //start testing |
| try { |
| for (int i=0; i<inputPaths.length; ++i) { |
| DAG dag = OrderedWordCount.createDAG(tezConf, inputPaths[i], outputPaths[i], 1, |
| false, false, ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session |
| |
| tezClient.waitTillReady(); |
| System.out.println("Running dag number " + i); |
| dagClients[i] = tezClient.submitDAG(dag); |
| |
| // wait to finish |
| DAGStatus dagStatus = dagClients[i].waitForCompletion(); |
| if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { |
| fail("Iteration " + i + " failed with diagnostics: " |
| + dagStatus.getDiagnostics()); |
| } |
| //verify all dags sharing the same execution context |
| if(i>0) { |
| assertTrue(dagClients[i-1].getExecutionContext().equals(dagClients[i].getExecutionContext())); |
| } |
| } |
| } finally { |
| tezClient.stop(); |
| } |
| } |
| |
| private void createInputFile(FileSystem fs, String path) throws IOException { |
| Path file = new Path(new Path(path), "input.txt"); |
| try { |
| FSDataOutputStream fsdos = fs.create(file); |
| fsdos.write("This is a small test file !".getBytes()); |
| fsdos.flush(); |
| fsdos.close(); |
| } catch (IOException ioe) { |
| fail("Can not create input File!"); |
| } |
| } |
| } |