blob: 50c9a6061cf6cfb9fe22073809af2de170f7587c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.api.client.rpc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.EnumSet;
import java.util.Set;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.client.FrameworkClient;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGClientImpl;
import org.apache.tez.dag.api.client.DAGClientTimelineImpl;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DagStatusSource;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.TimelineReaderFactory.TimelineReaderStrategy;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusResponseProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusResponseProto;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGRequestProto;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
import org.apache.tez.dag.api.records.DAGProtos.ProgressProto;
import org.apache.tez.dag.api.records.DAGProtos.StatusGetOptsProto;
import org.apache.tez.dag.api.records.DAGProtos.StringProgressPairProto;
import org.apache.tez.dag.api.records.DAGProtos.TezCounterGroupProto;
import org.apache.tez.dag.api.records.DAGProtos.TezCounterProto;
import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexStatusStateProto;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.internal.util.collections.Sets;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestDAGClient {
private DAGClient dagClient;
private ApplicationId mockAppId;
private ApplicationReport mockAppReport;
private String dagIdStr;
private DAGClientAMProtocolBlockingPB mockProxy;
private VertexStatusProto vertexStatusProtoWithoutCounters;
private VertexStatusProto vertexStatusProtoWithCounters;
private DAGStatusProto dagStatusProtoWithoutCounters;
private DAGStatusProto dagStatusProtoWithCounters;
private void setUpData(){
// DAG
ProgressProto dagProgressProto = ProgressProto.newBuilder()
.setFailedTaskCount(1)
.setKilledTaskCount(1)
.setRunningTaskCount(2)
.setSucceededTaskCount(2)
.setTotalTaskCount(6)
.build();
TezCountersProto dagCountersProto=TezCountersProto.newBuilder()
.addCounterGroups(TezCounterGroupProto.newBuilder()
.setName("DAGGroup")
.addCounters(TezCounterProto.newBuilder()
.setDisplayName("dag_counter_1")
.setValue(99)))
.build();
dagStatusProtoWithoutCounters = DAGStatusProto.newBuilder()
.addDiagnostics("Diagnostics_0")
.setState(DAGStatusStateProto.DAG_RUNNING)
.setDAGProgress(dagProgressProto)
.addVertexProgress(
StringProgressPairProto.newBuilder().setKey("v1")
.setProgress(ProgressProto.newBuilder()
.setFailedTaskCount(0)
.setSucceededTaskCount(0)
.setKilledTaskCount(0))
)
.addVertexProgress(
StringProgressPairProto.newBuilder().setKey("v2")
.setProgress(ProgressProto.newBuilder()
.setFailedTaskCount(1)
.setSucceededTaskCount(1)
.setKilledTaskCount(1))
)
.build();
dagStatusProtoWithCounters = DAGStatusProto.newBuilder(dagStatusProtoWithoutCounters)
.setDagCounters(dagCountersProto)
.build();
// Vertex
ProgressProto vertexProgressProto = ProgressProto.newBuilder()
.setFailedTaskCount(1)
.setKilledTaskCount(0)
.setRunningTaskCount(0)
.setSucceededTaskCount(1)
.build();
TezCountersProto vertexCountersProto=TezCountersProto.newBuilder()
.addCounterGroups(TezCounterGroupProto.newBuilder()
.addCounters(TezCounterProto.newBuilder()
.setDisplayName("vertex_counter_1")
.setValue(99)))
.build();
vertexStatusProtoWithoutCounters = VertexStatusProto.newBuilder()
.setId("vertex_1")
.addDiagnostics("V_Diagnostics_0")
.setProgress(vertexProgressProto)
.setState(VertexStatusStateProto.VERTEX_SUCCEEDED) // make sure the waitForCompletion be able to finish
.build();
vertexStatusProtoWithCounters = VertexStatusProto.newBuilder(vertexStatusProtoWithoutCounters)
.setVertexCounters(vertexCountersProto)
.build();
}
private static class DAGCounterRequestMatcher extends ArgumentMatcher<GetDAGStatusRequestProto>{
@Override
public boolean matches(Object argument) {
if (argument instanceof GetDAGStatusRequestProto){
GetDAGStatusRequestProto requestProto = (GetDAGStatusRequestProto)argument;
return requestProto.getStatusOptionsCount() != 0
&& requestProto.getStatusOptionsList().get(0) == StatusGetOptsProto.GET_COUNTERS;
}
return false;
}
}
private static class VertexCounterRequestMatcher extends ArgumentMatcher<GetVertexStatusRequestProto>{
@Override
public boolean matches(Object argument) {
if (argument instanceof GetVertexStatusRequestProto){
GetVertexStatusRequestProto requestProto = (GetVertexStatusRequestProto)argument;
return requestProto.getStatusOptionsCount() != 0
&& requestProto.getStatusOptionsList().get(0) == StatusGetOptsProto.GET_COUNTERS;
}
return false;
}
}
@Before
public void setUp() throws YarnException, IOException, TezException, ServiceException{
setUpData();
/////////////// mock //////////////////////
mockAppId = mock(ApplicationId.class);
mockAppReport = mock(ApplicationReport.class);
dagIdStr = "dag_9999_0001_1";
mockProxy = mock(DAGClientAMProtocolBlockingPB.class);
// return the response with Counters is the request match the CounterMatcher
when(mockProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class)))
.thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(dagStatusProtoWithoutCounters).build());
when(mockProxy.getDAGStatus(isNull(RpcController.class), argThat(new DAGCounterRequestMatcher())))
.thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(dagStatusProtoWithCounters).build());
when(mockProxy.getVertexStatus(isNull(RpcController.class), any(GetVertexStatusRequestProto.class)))
.thenReturn(GetVertexStatusResponseProto.newBuilder().setVertexStatus(vertexStatusProtoWithoutCounters).build());
when(mockProxy.getVertexStatus(isNull(RpcController.class), argThat(new VertexCounterRequestMatcher())))
.thenReturn(GetVertexStatusResponseProto.newBuilder().setVertexStatus(vertexStatusProtoWithCounters).build());
TezConfiguration tezConf = new TezConfiguration();
dagClient = new DAGClientImpl(mockAppId, dagIdStr, tezConf, null,
UserGroupInformation.getCurrentUser());
DAGClientRPCImpl realClient = (DAGClientRPCImpl)((DAGClientImpl)dagClient).getRealClient();
realClient.appReport = mockAppReport;
realClient.proxy = mockProxy;
}
@Test(timeout = 5000)
public void testApp() throws IOException, TezException, ServiceException{
assertTrue(dagClient.getExecutionContext().contains(mockAppId.toString()));
assertEquals(mockAppId.toString(), dagClient.getSessionIdentifierString());
assertEquals(dagIdStr, dagClient.getDagIdentifierString());
DAGClientRPCImpl realClient = (DAGClientRPCImpl)((DAGClientImpl)dagClient).getRealClient();
assertEquals(mockAppReport, realClient.getApplicationReportInternal());
}
@Test(timeout = 5000)
public void testDAGStatus() throws Exception{
DAGStatus resultDagStatus = dagClient.getDAGStatus(null);
verify(mockProxy, times(1)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
.setDagId(dagIdStr).setTimeout(0).build());
assertEquals(new DAGStatus(dagStatusProtoWithoutCounters, DagStatusSource.AM), resultDagStatus);
System.out.println("DAGStatusWithoutCounter:" + resultDagStatus);
resultDagStatus = dagClient.getDAGStatus(Sets.newSet(StatusGetOpts.GET_COUNTERS));
verify(mockProxy, times(1)).getDAGStatus(null, GetDAGStatusRequestProto.newBuilder()
.setDagId(dagIdStr).setTimeout(0).addStatusOptions(StatusGetOptsProto.GET_COUNTERS).build());
assertEquals(new DAGStatus(dagStatusProtoWithCounters, DagStatusSource.AM), resultDagStatus);
System.out.println("DAGStatusWithCounter:" + resultDagStatus);
}
@Test(timeout = 5000)
public void testVertexStatus() throws Exception{
VertexStatus resultVertexStatus = dagClient.getVertexStatus("v1", null);
verify(mockProxy).getVertexStatus(null, GetVertexStatusRequestProto.newBuilder()
.setDagId(dagIdStr).setVertexName("v1").build());
assertEquals(new VertexStatus(vertexStatusProtoWithoutCounters), resultVertexStatus);
System.out.println("VertexWithoutCounter:" + resultVertexStatus);
resultVertexStatus = dagClient.getVertexStatus("v1", Sets.newSet(StatusGetOpts.GET_COUNTERS));
verify(mockProxy).getVertexStatus(null, GetVertexStatusRequestProto.newBuilder()
.setDagId(dagIdStr).setVertexName("v1").addStatusOptions(StatusGetOptsProto.GET_COUNTERS)
.build());
assertEquals(new VertexStatus(vertexStatusProtoWithCounters), resultVertexStatus);
System.out.println("VertexWithCounter:" + resultVertexStatus);
}
@Test(timeout = 5000)
public void testTryKillDAG() throws Exception{
dagClient.tryKillDAG();
verify(mockProxy, times(1)).tryKillDAG(null, TryKillDAGRequestProto.newBuilder()
.setDagId(dagIdStr).build());
}
@Test(timeout = 5000)
public void testWaitForCompletion() throws Exception{
// first time return DAG_RUNNING, second time return DAG_SUCCEEDED
when(mockProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class)))
.thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(dagStatusProtoWithoutCounters)
.build())
.thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus
(DAGStatusProto.newBuilder(dagStatusProtoWithoutCounters)
.setState(DAGStatusStateProto.DAG_SUCCEEDED).build())
.build());
dagClient.waitForCompletion();
ArgumentCaptor<RpcController> rpcControllerArgumentCaptor =
ArgumentCaptor.forClass(RpcController.class);
ArgumentCaptor<GetDAGStatusRequestProto> argumentCaptor =
ArgumentCaptor.forClass(GetDAGStatusRequestProto.class);
verify(mockProxy, times(2))
.getDAGStatus(rpcControllerArgumentCaptor.capture(), argumentCaptor.capture());
}
@Test(timeout = 5000)
public void testWaitForCompletionWithStatusUpdates() throws Exception{
// first time and second time return DAG_RUNNING, third time return DAG_SUCCEEDED
when(mockProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class)))
.thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(
DAGStatusProto.newBuilder(dagStatusProtoWithCounters)
.setState(DAGStatusStateProto.DAG_RUNNING).build()).build())
.thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(
DAGStatusProto.newBuilder(dagStatusProtoWithCounters)
.setState(DAGStatusStateProto.DAG_RUNNING).build()).build())
.thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(
DAGStatusProto.newBuilder(dagStatusProtoWithCounters)
.setState(DAGStatusStateProto.DAG_RUNNING).build()).build())
.thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus
(DAGStatusProto.newBuilder(dagStatusProtoWithoutCounters)
.setState(DAGStatusStateProto.DAG_SUCCEEDED).build())
.build());
// first time for getVertexSet
// second & third time for check completion
ArgumentCaptor<RpcController> rpcControllerArgumentCaptor =
ArgumentCaptor.forClass(RpcController.class);
ArgumentCaptor<GetDAGStatusRequestProto> argumentCaptor =
ArgumentCaptor.forClass(GetDAGStatusRequestProto.class);
dagClient.waitForCompletionWithStatusUpdates(null);
// 2 from initial request - when status isn't cached. 1 for vertex names. 1 for final wait.
verify(mockProxy, times(4))
.getDAGStatus(rpcControllerArgumentCaptor.capture(), argumentCaptor.capture());
when(mockProxy.getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class)))
.thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(
DAGStatusProto.newBuilder(dagStatusProtoWithCounters)
.setState(DAGStatusStateProto.DAG_RUNNING).build()).build())
.thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(
DAGStatusProto.newBuilder(dagStatusProtoWithCounters)
.setState(DAGStatusStateProto.DAG_RUNNING).build()).build())
.thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus(
DAGStatusProto.newBuilder(dagStatusProtoWithCounters)
.setState(DAGStatusStateProto.DAG_RUNNING).build()).build())
.thenReturn(GetDAGStatusResponseProto.newBuilder().setDagStatus
(DAGStatusProto.newBuilder(dagStatusProtoWithCounters).setState(
DAGStatusStateProto.DAG_SUCCEEDED).build())
.build());
rpcControllerArgumentCaptor =
ArgumentCaptor.forClass(RpcController.class);
argumentCaptor =
ArgumentCaptor.forClass(GetDAGStatusRequestProto.class);
dagClient.waitForCompletionWithStatusUpdates(Sets.newSet(StatusGetOpts.GET_COUNTERS));
// 4 from past invocation in the test, 2 from initial request - when status isn't cached. 1 for vertex names. 1 for final wait.
verify(mockProxy, times(8))
.getDAGStatus(rpcControllerArgumentCaptor.capture(), argumentCaptor.capture());
}
@Test(timeout = 50000)
public void testGetDagStatusWithTimeout() throws Exception {
long startTime;
long endTime;
long diff;
TezConfiguration tezConf = new TezConfiguration();
tezConf.setLong(TezConfiguration.TEZ_DAG_STATUS_POLLINTERVAL_MS, 800l);
DAGClientImplForTest dagClient = new DAGClientImplForTest(mockAppId, dagIdStr, tezConf, null);
DAGClientRPCImplForTest dagClientRpc =
new DAGClientRPCImplForTest(mockAppId, dagIdStr, tezConf, null);
dagClient.setRealClient(dagClientRpc);
DAGStatus dagStatus;
// Fetch from RM. AM not up yet.
dagClientRpc.setAMProxy(null);
DAGStatus rmDagStatus =
new DAGStatus(constructDagStatusProto(DAGStatusStateProto.DAG_SUBMITTED),
DagStatusSource.RM);
dagClient.setRmDagStatus(rmDagStatus);
startTime = System.currentTimeMillis();
dagStatus = dagClient.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000l);
endTime = System.currentTimeMillis();
diff = endTime - startTime;
assertTrue(diff > 1500l && diff < 2500l);
// One at start. Second and Third within the sleep. Fourth at final refresh.
assertEquals(0, dagClientRpc.numGetStatusViaAmInvocations); // No AM available, so no invocations to AM
assertEquals(4, dagClient.numGetStatusViaRmInvocations);
assertEquals(DAGStatus.State.SUBMITTED, dagStatus.getState());
// Fetch from AM. RUNNING
dagClient.resetCounters();
dagClientRpc.resetCountesr();
rmDagStatus =
new DAGStatus(constructDagStatusProto(DAGStatusStateProto.DAG_RUNNING), DagStatusSource.RM);
dagClient.setRmDagStatus(rmDagStatus);
dagClientRpc.setAMProxy(createMockProxy(DAGStatusStateProto.DAG_RUNNING, -1));
startTime = System.currentTimeMillis();
dagStatus = dagClient.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000l);
endTime = System.currentTimeMillis();
diff = endTime - startTime;
assertTrue(diff > 1500l && diff < 2500l);
// Directly from AM
assertEquals(0, dagClient.numGetStatusViaRmInvocations);
// Directly from AM - one refresh. One with timeout.
assertEquals(2, dagClientRpc.numGetStatusViaAmInvocations);
assertEquals(DAGStatus.State.RUNNING, dagStatus.getState());
// Fetch from AM. Success.
dagClient.resetCounters();
dagClientRpc.resetCountesr();
rmDagStatus =
new DAGStatus(constructDagStatusProto(DAGStatusStateProto.DAG_RUNNING), DagStatusSource.RM);
dagClient.setRmDagStatus(rmDagStatus);
dagClientRpc.setAMProxy(createMockProxy(DAGStatusStateProto.DAG_SUCCEEDED, 1000l));
startTime = System.currentTimeMillis();
dagStatus = dagClient.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000l);
endTime = System.currentTimeMillis();
diff = endTime - startTime;
assertTrue(diff > 500l && diff < 1500l);
// Directly from AM
assertEquals(0, dagClient.numGetStatusViaRmInvocations);
// Directly from AM - previous request cached, so single invocation only.
assertEquals(1, dagClientRpc.numGetStatusViaAmInvocations);
assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
}
@Test(timeout = 5000)
public void testDagClientTimelineEnabledCondition() throws IOException {
String historyLoggingClass = "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService";
testAtsEnabled(mockAppId, dagIdStr, false, "", true, true);
testAtsEnabled(mockAppId, dagIdStr, false, historyLoggingClass, false, true);
testAtsEnabled(mockAppId, dagIdStr, false, historyLoggingClass, true, false);
testAtsEnabled(mockAppId, dagIdStr, DAGClientTimelineImpl.isSupported(), historyLoggingClass,
true, true);
}
private static void testAtsEnabled(ApplicationId appId, String dagIdStr, boolean expected,
String loggingClass, boolean amHistoryLoggingEnabled,
boolean dagHistoryLoggingEnabled) throws IOException {
TezConfiguration tezConf = new TezConfiguration();
tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, loggingClass);
tezConf.setBoolean(TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED, amHistoryLoggingEnabled);
tezConf.setBoolean(TezConfiguration.TEZ_DAG_HISTORY_LOGGING_ENABLED, dagHistoryLoggingEnabled);
DAGClientImplForTest dagClient = new DAGClientImplForTest(appId, dagIdStr, tezConf, null);
assertEquals(expected, dagClient.getIsATSEnabled());
}
private static class DAGClientRPCImplForTest extends DAGClientRPCImpl {
int numGetStatusViaAmInvocations = 0;
public DAGClientRPCImplForTest(ApplicationId appId, String dagId,
TezConfiguration conf,
@Nullable FrameworkClient frameworkClient) throws IOException {
super(appId, dagId, conf, frameworkClient, UserGroupInformation.getCurrentUser());
}
void setAMProxy(DAGClientAMProtocolBlockingPB proxy) {
this.proxy = proxy;
}
void resetCountesr() {
numGetStatusViaAmInvocations = 0;
}
@Override
boolean createAMProxyIfNeeded() throws IOException, TezException {
if (proxy == null) {
return false;
} else {
return true;
}
}
@Override
DAGStatus getDAGStatusViaAM(Set<StatusGetOpts> statusOptions, long timeout)
throws IOException, TezException {
numGetStatusViaAmInvocations++;
return super.getDAGStatusViaAM(statusOptions, timeout);
}
}
private static class DAGClientImplForTest extends DAGClientImpl {
private DAGStatus rmDagStatus;
int numGetStatusViaRmInvocations = 0;
public DAGClientImplForTest(ApplicationId appId, String dagId, TezConfiguration conf,
@Nullable FrameworkClient frameworkClient) throws IOException {
super(appId, dagId, conf, frameworkClient, UserGroupInformation.getCurrentUser());
}
private void setRealClient(DAGClientRPCImplForTest dagClientRpcImplForTest) {
this.realClient = dagClientRpcImplForTest;
}
void setRmDagStatus(DAGStatus rmDagStatus) {
this.rmDagStatus = rmDagStatus;
}
void resetCounters() {
numGetStatusViaRmInvocations = 0;
}
@Override
protected DAGStatus getDAGStatusViaRM() throws TezException, IOException {
numGetStatusViaRmInvocations++;
return rmDagStatus;
}
public boolean getIsATSEnabled() {
return isATSEnabled;
}
}
private DAGProtos.DAGStatusProto.Builder constructDagStatusProto(DAGStatusStateProto stateProto) {
DAGProtos.DAGStatusProto.Builder builder = DAGProtos.DAGStatusProto.newBuilder();
builder.setState(stateProto);
return builder;
}
private DAGClientAMProtocolBlockingPB createMockProxy(final DAGStatusStateProto stateProto,
final long timeout) throws
ServiceException {
DAGClientAMProtocolBlockingPB mock = mock(DAGClientAMProtocolBlockingPB.class);
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
GetDAGStatusRequestProto request = (GetDAGStatusRequestProto) invocation.getArguments()[1];
long sleepTime = request.getTimeout();
if (timeout != -1) {
sleepTime = timeout;
}
Thread.sleep(sleepTime);
return GetDAGStatusResponseProto.newBuilder().setDagStatus(constructDagStatusProto(
stateProto)).build();
}
}).when(mock).getDAGStatus(isNull(RpcController.class), any(GetDAGStatusRequestProto.class));
return mock;
}
@Test
/* testing idea is borrowed from YARN-5309 */
public void testTimelineClientCleanup() throws Exception {
TezConfiguration tezConf = new TezConfiguration();
tezConf.set("yarn.http.policy", "HTTPS_ONLY");
File testDir = new File(System.getProperty("java.io.tmpdir"));
String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestDAGClient.class);
KeyStoreTestUtil.setupSSLConfig(testDir.getAbsolutePath(), sslConfDir, tezConf, false);
DAGClientTimelineImpl dagClient =
new DAGClientTimelineImpl(mockAppId, dagIdStr, tezConf, mock(FrameworkClient.class), 10000);
Field field = DAGClientTimelineImpl.class.getDeclaredField("timelineReaderStrategy");
field.setAccessible(true);
TimelineReaderStrategy strategy = (TimelineReaderStrategy) field.get(dagClient);
strategy.getHttpClient(); // calls SSLFactory.init
ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
while (threadGroup.getParent() != null) {
threadGroup = threadGroup.getParent();
}
Thread[] threads = new Thread[threadGroup.activeCount()];
threadGroup.enumerate(threads);
Thread reloaderThread = null;
for (Thread thread : threads) {
if ((thread.getName() != null) && (thread.getName().contains("Truststore reloader thread"))) {
reloaderThread = thread;
}
}
Assert.assertTrue("Reloader is not alive", reloaderThread.isAlive());
dagClient.close();
boolean reloaderStillAlive = true;
for (int i = 0; i < 10; i++) {
reloaderStillAlive = reloaderThread.isAlive();
if (!reloaderStillAlive) {
break;
}
Thread.sleep(1000);
}
Assert.assertFalse("Reloader is still alive", reloaderStillAlive);
}
}