blob: 610448ca07197022e281753bf8352cc5e40b50ae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.launcher;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class TestContainerLauncherImpl {
static final Log LOG = LogFactory.getLog(TestContainerLauncherImpl.class);
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private Map<String, ByteBuffer> serviceResponse =
new HashMap<String, ByteBuffer>();
@Before
public void setup() throws IOException {
serviceResponse.clear();
serviceResponse.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
ShuffleHandler.serializeMetaData(80));
}
// tests here mock ContainerManagementProtocol which does not have close
// method. creating an interface that implements ContainerManagementProtocol
// and Closeable so the tests does not fail with NoSuchMethodException
private static interface ContainerManagementProtocolClient extends
ContainerManagementProtocol, Closeable {
}
private static class ContainerLauncherImplUnderTest extends
ContainerLauncherImpl {
private ContainerManagementProtocol containerManager;
public ContainerLauncherImplUnderTest(AppContext context,
ContainerManagementProtocol containerManager) {
super(context);
this.containerManager = containerManager;
}
@Override
public ContainerManagementProtocolProxyData getCMProxy(
String containerMgrBindAddr, ContainerId containerId)
throws IOException {
ContainerManagementProtocolProxyData protocolProxy =
mock(ContainerManagementProtocolProxyData.class);
when(protocolProxy.getContainerManagementProtocol()).thenReturn(
containerManager);
return protocolProxy;
}
public void waitForPoolToIdle() throws InterruptedException {
//I wish that we did not need the sleep, but it is here so that we are sure
// That the other thread had time to insert the event into the queue and
// start processing it. For some reason we were getting interrupted
// exceptions within eventQueue without this sleep.
Thread.sleep(100l);
LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+
" POOL SIZE 2: "+this.launcherPool.getQueue().size()+
" ACTIVE COUNT: "+ this.launcherPool.getActiveCount());
while(!this.eventQueue.isEmpty() ||
!this.launcherPool.getQueue().isEmpty() ||
this.launcherPool.getActiveCount() > 0) {
Thread.sleep(100l);
LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+
" POOL SIZE 2: "+this.launcherPool.getQueue().size()+
" ACTIVE COUNT: "+ this.launcherPool.getActiveCount());
}
LOG.debug("POOL SIZE 1: "+this.eventQueue.size()+
" POOL SIZE 2: "+this.launcherPool.getQueue().size()+
" ACTIVE COUNT: "+ this.launcherPool.getActiveCount());
}
}
public static ContainerId makeContainerId(long ts, int appId, int attemptId,
int id) {
return ContainerId.newContainerId(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(ts, appId), attemptId), id);
}
public static TaskAttemptId makeTaskAttemptId(long ts, int appId, int taskId,
TaskType taskType, int id) {
ApplicationId aID = ApplicationId.newInstance(ts, appId);
JobId jID = MRBuilderUtils.newJobId(aID, id);
TaskId tID = MRBuilderUtils.newTaskId(jID, taskId, taskType);
return MRBuilderUtils.newTaskAttemptId(tID, id);
}
@Test(timeout = 5000)
public void testHandle() throws Exception {
LOG.info("STARTING testHandle");
AppContext mockContext = mock(AppContext.class);
@SuppressWarnings("rawtypes")
EventHandler mockEventHandler = mock(EventHandler.class);
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
String cmAddress = "127.0.0.1:8000";
ContainerManagementProtocolClient mockCM =
mock(ContainerManagementProtocolClient.class);
ContainerLauncherImplUnderTest ut =
new ContainerLauncherImplUnderTest(mockContext, mockCM);
Configuration conf = new Configuration();
ut.init(conf);
ut.start();
try {
ContainerId contId = makeContainerId(0l, 0, 0, 1);
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
StartContainersResponse startResp =
recordFactory.newRecordInstance(StartContainersResponse.class);
startResp.setAllServicesMetaData(serviceResponse);
LOG.info("inserting launch event");
ContainerRemoteLaunchEvent mockLaunchEvent =
mock(ContainerRemoteLaunchEvent.class);
when(mockLaunchEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
when(mockLaunchEvent.getContainerID())
.thenReturn(contId);
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
when(mockLaunchEvent.getContainerToken()).thenReturn(
createNewContainerToken(contId, cmAddress));
ut.handle(mockLaunchEvent);
ut.waitForPoolToIdle();
verify(mockCM).startContainers(any(StartContainersRequest.class));
LOG.info("inserting cleanup event");
ContainerLauncherEvent mockCleanupEvent =
mock(ContainerLauncherEvent.class);
when(mockCleanupEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
when(mockCleanupEvent.getContainerID())
.thenReturn(contId);
when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
ut.handle(mockCleanupEvent);
ut.waitForPoolToIdle();
verify(mockCM).stopContainers(any(StopContainersRequest.class));
} finally {
ut.stop();
}
}
@Test(timeout = 5000)
public void testOutOfOrder() throws Exception {
LOG.info("STARTING testOutOfOrder");
AppContext mockContext = mock(AppContext.class);
@SuppressWarnings("rawtypes")
EventHandler mockEventHandler = mock(EventHandler.class);
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
ContainerManagementProtocolClient mockCM =
mock(ContainerManagementProtocolClient.class);
ContainerLauncherImplUnderTest ut =
new ContainerLauncherImplUnderTest(mockContext, mockCM);
Configuration conf = new Configuration();
ut.init(conf);
ut.start();
try {
ContainerId contId = makeContainerId(0l, 0, 0, 1);
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
String cmAddress = "127.0.0.1:8000";
StartContainersResponse startResp =
recordFactory.newRecordInstance(StartContainersResponse.class);
startResp.setAllServicesMetaData(serviceResponse);
LOG.info("inserting cleanup event");
ContainerLauncherEvent mockCleanupEvent =
mock(ContainerLauncherEvent.class);
when(mockCleanupEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
when(mockCleanupEvent.getContainerID())
.thenReturn(contId);
when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
ut.handle(mockCleanupEvent);
ut.waitForPoolToIdle();
verify(mockCM, never()).stopContainers(any(StopContainersRequest.class));
LOG.info("inserting launch event");
ContainerRemoteLaunchEvent mockLaunchEvent =
mock(ContainerRemoteLaunchEvent.class);
when(mockLaunchEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
when(mockLaunchEvent.getContainerID())
.thenReturn(contId);
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
when(mockLaunchEvent.getContainerToken()).thenReturn(
createNewContainerToken(contId, cmAddress));
ut.handle(mockLaunchEvent);
ut.waitForPoolToIdle();
verify(mockCM, never()).startContainers(any(StartContainersRequest.class));
} finally {
ut.stop();
}
}
@Test(timeout = 5000)
public void testMyShutdown() throws Exception {
LOG.info("in test Shutdown");
AppContext mockContext = mock(AppContext.class);
@SuppressWarnings("rawtypes")
EventHandler mockEventHandler = mock(EventHandler.class);
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
ContainerManagementProtocolClient mockCM =
mock(ContainerManagementProtocolClient.class);
ContainerLauncherImplUnderTest ut =
new ContainerLauncherImplUnderTest(mockContext, mockCM);
Configuration conf = new Configuration();
ut.init(conf);
ut.start();
try {
ContainerId contId = makeContainerId(0l, 0, 0, 1);
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
String cmAddress = "127.0.0.1:8000";
StartContainersResponse startResp =
recordFactory.newRecordInstance(StartContainersResponse.class);
startResp.setAllServicesMetaData(serviceResponse);
LOG.info("inserting launch event");
ContainerRemoteLaunchEvent mockLaunchEvent =
mock(ContainerRemoteLaunchEvent.class);
when(mockLaunchEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
when(mockLaunchEvent.getContainerID())
.thenReturn(contId);
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
when(mockCM.startContainers(any(StartContainersRequest.class))).thenReturn(startResp);
when(mockLaunchEvent.getContainerToken()).thenReturn(
createNewContainerToken(contId, cmAddress));
ut.handle(mockLaunchEvent);
ut.waitForPoolToIdle();
verify(mockCM).startContainers(any(StartContainersRequest.class));
// skip cleanup and make sure stop kills the container
} finally {
ut.stop();
verify(mockCM).stopContainers(any(StopContainersRequest.class));
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test(timeout = 5000)
public void testContainerCleaned() throws Exception {
LOG.info("STARTING testContainerCleaned");
CyclicBarrier startLaunchBarrier = new CyclicBarrier(2);
CyclicBarrier completeLaunchBarrier = new CyclicBarrier(2);
AppContext mockContext = mock(AppContext.class);
EventHandler mockEventHandler = mock(EventHandler.class);
when(mockContext.getEventHandler()).thenReturn(mockEventHandler);
ContainerManagementProtocolClient mockCM =
new ContainerManagerForTest(startLaunchBarrier, completeLaunchBarrier);
ContainerLauncherImplUnderTest ut =
new ContainerLauncherImplUnderTest(mockContext, mockCM);
Configuration conf = new Configuration();
ut.init(conf);
ut.start();
try {
ContainerId contId = makeContainerId(0l, 0, 0, 1);
TaskAttemptId taskAttemptId = makeTaskAttemptId(0l, 0, 0, TaskType.MAP, 0);
String cmAddress = "127.0.0.1:8000";
StartContainersResponse startResp =
recordFactory.newRecordInstance(StartContainersResponse.class);
startResp.setAllServicesMetaData(serviceResponse);
LOG.info("inserting launch event");
ContainerRemoteLaunchEvent mockLaunchEvent =
mock(ContainerRemoteLaunchEvent.class);
when(mockLaunchEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_LAUNCH);
when(mockLaunchEvent.getContainerID())
.thenReturn(contId);
when(mockLaunchEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockLaunchEvent.getContainerMgrAddress()).thenReturn(cmAddress);
when(mockLaunchEvent.getContainerToken()).thenReturn(
createNewContainerToken(contId, cmAddress));
ut.handle(mockLaunchEvent);
startLaunchBarrier.await();
LOG.info("inserting cleanup event");
ContainerLauncherEvent mockCleanupEvent =
mock(ContainerLauncherEvent.class);
when(mockCleanupEvent.getType())
.thenReturn(EventType.CONTAINER_REMOTE_CLEANUP);
when(mockCleanupEvent.getContainerID())
.thenReturn(contId);
when(mockCleanupEvent.getTaskAttemptID()).thenReturn(taskAttemptId);
when(mockCleanupEvent.getContainerMgrAddress()).thenReturn(cmAddress);
ut.handle(mockCleanupEvent);
completeLaunchBarrier.await();
ut.waitForPoolToIdle();
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
verify(mockEventHandler, atLeast(2)).handle(arg.capture());
boolean containerCleaned = false;
for (int i =0; i < arg.getAllValues().size(); i++) {
LOG.info(arg.getAllValues().get(i).toString());
Event currentEvent = arg.getAllValues().get(i);
if (currentEvent.getType() == TaskAttemptEventType.TA_CONTAINER_CLEANED) {
containerCleaned = true;
}
}
assert(containerCleaned);
} finally {
ut.stop();
}
}
private Token createNewContainerToken(ContainerId contId,
String containerManagerAddr) {
long currentTime = System.currentTimeMillis();
return MRApp.newContainerToken(NodeId.newInstance("127.0.0.1",
1234), "password".getBytes(), new ContainerTokenIdentifier(
contId, containerManagerAddr, "user",
Resource.newInstance(1024, 1),
currentTime + 10000L, 123, currentTime, Priority.newInstance(0), 0));
}
private static class ContainerManagerForTest implements ContainerManagementProtocolClient {
private CyclicBarrier startLaunchBarrier;
private CyclicBarrier completeLaunchBarrier;
ContainerManagerForTest (CyclicBarrier startLaunchBarrier, CyclicBarrier completeLaunchBarrier) {
this.startLaunchBarrier = startLaunchBarrier;
this.completeLaunchBarrier = completeLaunchBarrier;
}
@Override
public StartContainersResponse startContainers(StartContainersRequest request)
throws IOException {
try {
startLaunchBarrier.await();
completeLaunchBarrier.await();
//To ensure the kill is started before the launch
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
throw new IOException(new ContainerException("Force fail CM"));
}
@Override
public StopContainersResponse stopContainers(StopContainersRequest request)
throws IOException {
return null;
}
@Override
public GetContainerStatusesResponse getContainerStatuses(
GetContainerStatusesRequest request) throws IOException {
return null;
}
@Override
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest request) throws YarnException,
IOException {
return null;
}
@Override
public void close() throws IOException {
}
}
@SuppressWarnings("serial")
private static class ContainerException extends YarnException {
public ContainerException(String message) {
super(message);
}
@Override
public YarnException getCause() {
return null;
}
}
}