blob: 4fb563c65d49a70b289ed88ccacc4815e0c8bb54 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.api.client;
import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.client.PreWarmContext;
import org.apache.tez.client.TezAppMasterStatus;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.records.TezDAGID;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.internal.util.collections.Sets;
public class TestDAGClientHandler {
@Test(timeout = 1000)
@SuppressWarnings("unchecked")
public void testDAGClientHandler() throws TezException {
TezDAGID mockTezDAGId = mock(TezDAGID.class);
when(mockTezDAGId.getId()).thenReturn(1);
when(mockTezDAGId.toString()).thenReturn("dag_9999_0001_1");
DAG mockDAG = mock(DAG.class);
when(mockDAG.getID()).thenReturn(mockTezDAGId);
DAGStatusBuilder mockDagStatusBuilder = mock(DAGStatusBuilder.class);
when(mockDAG.getDAGStatus(anySetOf(StatusGetOpts.class))).thenReturn(
mockDagStatusBuilder);
VertexStatusBuilder mockVertexStatusBuilder =
mock(VertexStatusBuilder.class);
when(mockDAG.getVertexStatus(anyString(), anySetOf(StatusGetOpts.class)))
.thenReturn(mockVertexStatusBuilder);
DAGAppMaster mockDagAM = mock(DAGAppMaster.class);
AppContext mockAppContext = mock(AppContext.class);
when(mockDagAM.getContext()).thenReturn(mockAppContext);
when(mockDagAM.getContext().getCurrentDAG()).thenReturn(mockDAG);
DAGClientHandler dagClientHandler = new DAGClientHandler(mockDagAM);
// getAllDAGs()
assertEquals(1, dagClientHandler.getAllDAGs().size());
assertEquals("dag_9999_0001_1", dagClientHandler.getAllDAGs().get(0));
// getDAGStatus
try {
dagClientHandler.getDAGStatus("dag_9999_0001_2", Sets.newSet(StatusGetOpts.GET_COUNTERS));
fail("should not come here");
} catch (TezException e) {
assertTrue(e.getMessage().contains("Unknown dagId"));
}
DAGStatus dagStatus = dagClientHandler.getDAGStatus("dag_9999_0001_1",
Sets.newSet(StatusGetOpts.GET_COUNTERS));
assertEquals(mockDagStatusBuilder, dagStatus);
// getVertexStatus
try {
dagClientHandler.getVertexStatus("dag_9999_0001_2", "v1", Sets.newSet(StatusGetOpts.GET_COUNTERS));
fail("should not come here");
} catch (TezException e) {
assertTrue(e.getMessage().contains("Unknown dagId"));
}
VertexStatus vertexStatus = dagClientHandler.getVertexStatus("dag_9999_0001_1", "v1",
Sets.newSet(StatusGetOpts.GET_COUNTERS));
assertEquals(mockVertexStatusBuilder, vertexStatus);
// getSessionStatus
when(mockDagAM.isSession()).thenReturn(false);
try{
dagClientHandler.getSessionStatus();
fail("should not come here");
}catch(TezException e){
assertEquals("Unsupported operation as AM not running in session mode", e.getMessage());
}
when(mockDagAM.isSession()).thenReturn(true);
when(mockDagAM.getState()).thenReturn(DAGAppMasterState.INITED);
assertEquals(TezAppMasterStatus.INITIALIZING, dagClientHandler.getSessionStatus());
when(mockDagAM.getState()).thenReturn(DAGAppMasterState.ERROR);
assertEquals(TezAppMasterStatus.SHUTDOWN, dagClientHandler.getSessionStatus());
// startPreWarmContainers
PreWarmContext mockPreWarnContext = mock(PreWarmContext.class);
dagClientHandler.preWarmContainers(mockPreWarnContext);
verify(mockDagAM).startPreWarmContainers(mockPreWarnContext);
// tryKillDAG
try{
dagClientHandler.tryKillDAG("dag_9999_0001_2");
fail("should not come here");
}catch(TezException e){
assertTrue(e.getMessage().contains("Unknown dagId"));
}
dagClientHandler.tryKillDAG("dag_9999_0001_1");
ArgumentCaptor<DAG> eventCaptor = ArgumentCaptor.forClass(DAG.class);
verify(mockDagAM, times(1)).tryKillDAG(eventCaptor.capture());
assertEquals(1, eventCaptor.getAllValues().size());
assertTrue(eventCaptor.getAllValues().get(0) instanceof DAG);
assertEquals("dag_9999_0001_1", ((DAG)eventCaptor.getAllValues().get(0)).getID().toString());
// submitDAG
DAGPlan dagPlan = DAGPlan.getDefaultInstance();
Map<String,LocalResource> localResources = new HashMap<String, LocalResource>();
dagClientHandler.submitDAG(dagPlan, localResources);
verify(mockDagAM).submitDAGToAppMaster(dagPlan, localResources);
// shutdown
dagClientHandler.shutdownAM();
verify(mockDagAM).shutdownTezAM();
}
}