blob: 7d50c0671b45e0012f182e52ce204821f8952b6f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.processor.SleepProcessor;
import org.junit.Test;
public class TestLocalMode {
@Test(timeout = 30000)
public void testMultipleClientsWithSession() throws TezException, InterruptedException,
IOException {
TezConfiguration tezConf1 = new TezConfiguration();
tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
tezConf1.set("fs.defaultFS", "file:///");
tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
TezClient tezClient1 = TezClient.create("commonName", tezConf1, true);
tezClient1.start();
DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName());
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());
dagClient1.close();
tezClient1.stop();
TezConfiguration tezConf2 = new TezConfiguration();
tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
tezConf2.set("fs.defaultFS", "file:///");
tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
TezClient tezClient2 = TezClient.create("commonName", tezConf2, true);
tezClient2.start();
DAGClient dagClient2 = tezClient2.submitDAG(dag2);
dagClient2.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient2.getDAGStatus(null).getState());
assertFalse(dagClient1.getExecutionContext().equals(dagClient2.getExecutionContext()));
dagClient2.close();
tezClient2.stop();
}
@Test(timeout = 10000)
public void testMultipleClientsWithoutSession() throws TezException, InterruptedException,
IOException {
TezConfiguration tezConf1 = new TezConfiguration();
tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
tezConf1.set("fs.defaultFS", "file:///");
tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
tezClient1.start();
DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName());
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());
dagClient1.close();
tezClient1.stop();
TezConfiguration tezConf2 = new TezConfiguration();
tezConf2.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
tezConf2.set("fs.defaultFS", "file:///");
tezConf2.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
DAG dag2 = createSimpleDAG("dag2", SleepProcessor.class.getName());
TezClient tezClient2 = TezClient.create("commonName", tezConf2, false);
tezClient2.start();
DAGClient dagClient2 = tezClient2.submitDAG(dag2);
dagClient2.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient2.getDAGStatus(null).getState());
assertFalse(dagClient1.getExecutionContext().equals(dagClient2.getExecutionContext()));
dagClient2.close();
tezClient2.stop();
}
@Test(timeout = 20000)
public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException,
IOException {
TezConfiguration tezConf1 = new TezConfiguration();
tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
tezConf1.set("fs.defaultFS", "file:///");
tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
// Run in non-session mode so that the AM terminates
TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
tezClient1.start();
DAG dag1 = createSimpleDAG("dag1", SleepProcessor.class.getName());
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.SUCCEEDED, dagClient1.getDAGStatus(null).getState());
// Sleep for more time than is required for the DAG to complete.
Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5));
dagClient1.close();
tezClient1.stop();
}
@Test(timeout = 20000)
public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException,
IOException {
TezConfiguration tezConf1 = new TezConfiguration();
tezConf1.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
tezConf1.set("fs.defaultFS", "file:///");
tezConf1.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
// Run in non-session mode so that the AM terminates
TezClient tezClient1 = TezClient.create("commonName", tezConf1, false);
tezClient1.start();
DAG dag1 = createSimpleDAG("dag1", FailingProcessor.class.getName());
DAGClient dagClient1 = tezClient1.submitDAG(dag1);
dagClient1.waitForCompletion();
assertEquals(DAGStatus.State.FAILED, dagClient1.getDAGStatus(null).getState());
// Sleep for more time than is required for the DAG to complete.
Thread.sleep((long) (TezConstants.TEZ_DAG_SLEEP_TIME_BEFORE_EXIT * 1.5));
dagClient1.close();
tezClient1.stop();
}
public static class FailingProcessor extends AbstractLogicalIOProcessor {
public FailingProcessor(ProcessorContext context) {
super(context);
}
@Override
public void initialize() throws Exception {
}
@Override
public void handleEvents(List<Event> processorEvents) {
}
@Override
public void close() throws Exception {
}
@Override
public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws
Exception {
throw new TezException("FailingProcessor");
}
}
private DAG createSimpleDAG(String dagName, String processorName) {
DAG dag = DAG.create(dagName).addVertex(Vertex.create("Sleep", ProcessorDescriptor.create(
processorName).setUserPayload(
new SleepProcessor.SleepProcessorConfig(1).toUserPayload()), 1));
return dag;
}
}