| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.app.rm; |
| |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.assertEquals; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Matchers.eq; |
| import static org.mockito.Mockito.doReturn; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.tez.common.TezUtils; |
| import org.apache.tez.serviceplugins.api.TaskScheduler; |
| import org.mockito.ArgumentCaptor; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.hadoop.yarn.api.records.LocalResource; |
| import org.apache.hadoop.yarn.api.records.NodeId; |
| import org.apache.hadoop.yarn.api.records.Priority; |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.hadoop.yarn.client.api.AMRMClient; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.tez.common.MockDNSToSwitchMapping; |
| import org.apache.tez.dag.api.InputDescriptor; |
| import org.apache.tez.dag.api.OutputDescriptor; |
| import org.apache.tez.dag.api.ProcessorDescriptor; |
| import org.apache.tez.serviceplugins.api.TaskAttemptEndReason; |
| import org.apache.tez.dag.api.TezConfiguration; |
| import org.apache.tez.dag.api.TaskLocationHint; |
| import org.apache.tez.dag.api.oldrecords.TaskAttemptState; |
| import org.apache.tez.dag.app.AppContext; |
| import org.apache.tez.dag.app.ClusterInfo; |
| import org.apache.tez.dag.app.ContainerContext; |
| import org.apache.tez.dag.app.ContainerHeartbeatHandler; |
| import org.apache.tez.dag.app.DAGAppMasterState; |
| import org.apache.tez.dag.app.TaskCommunicatorManagerInterface; |
| import org.apache.tez.dag.app.dag.TaskAttempt; |
| import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest; |
| import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest; |
| import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest; |
| import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher; |
| import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.CapturingEventHandler; |
| import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable; |
| import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerManagerForTest; |
| import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext; |
| import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA; |
| import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest; |
| import org.apache.tez.dag.app.rm.container.AMContainerMap; |
| import org.apache.tez.dag.app.rm.container.ContainerContextMatcher; |
| import org.apache.tez.dag.app.rm.node.AMNodeTracker; |
| import org.apache.tez.dag.records.TezDAGID; |
| import org.apache.tez.dag.records.TezTaskAttemptID; |
| import org.apache.tez.dag.records.TezTaskID; |
| import org.apache.tez.dag.records.TezVertexID; |
| import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption; |
| import org.apache.tez.runtime.api.impl.InputSpec; |
| import org.apache.tez.runtime.api.impl.OutputSpec; |
| import org.apache.tez.runtime.api.impl.TaskSpec; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.stubbing.Answer; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| |
| public class TestContainerReuse { |
| private static final Logger LOG = LoggerFactory.getLogger(TestContainerReuse.class); |
| |
| @BeforeClass |
| public static void setup() { |
| MockDNSToSwitchMapping.initializeMockRackResolver(); |
| } |
| |
| @Test(timeout = 15000l) |
| public void testDelayedReuseContainerBecomesAvailable() |
| throws IOException, InterruptedException, ExecutionException { |
| LOG.info("Test testDelayedReuseContainerBecomesAvailable"); |
| Configuration conf = new Configuration(new YarnConfiguration()); |
| conf.setBoolean( |
| TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); |
| conf.setBoolean( |
| TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false); |
| conf.setBoolean( |
| TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false); |
| conf.setLong( |
| TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 3000l); |
| conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); |
| conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); |
| |
| CapturingEventHandler eventHandler = new CapturingEventHandler(); |
| TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| |
| AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); |
| TezAMRMClientAsync<CookieContainerRequest> rmClient = |
| spy(new AMRMClientAsyncForTest(rmClientCore, 100)); |
| |
| AppContext appContext = mock(AppContext.class); |
| doReturn(conf).when(appContext).getAMConf(); |
| AMContainerMap amContainerMap = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), |
| mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appContext); |
| AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); |
| doReturn(amContainerMap).when(appContext).getAllContainers(); |
| doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); |
| doReturn(amNodeTracker).when(appContext).getNodeTracker(); |
| doReturn(dagID).when(appContext).getCurrentDAGID(); |
| doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); |
| |
| TaskSchedulerManager taskSchedulerManagerReal = |
| new TaskSchedulerManagerForTest( |
| appContext, eventHandler, rmClient, |
| new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(conf)); |
| TaskSchedulerManager taskSchedulerManager = |
| spy(taskSchedulerManagerReal); |
| taskSchedulerManager.init(conf); |
| taskSchedulerManager.start(); |
| |
| TaskSchedulerWithDrainableContext taskScheduler = |
| (TaskSchedulerWithDrainableContext) |
| ((TaskSchedulerManagerForTest) taskSchedulerManager) |
| .getSpyTaskScheduler(); |
| TaskSchedulerContextDrainable drainableAppCallback = |
| taskScheduler.getDrainableAppCallback(); |
| |
| AtomicBoolean drainNotifier = new AtomicBoolean(false); |
| taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; |
| |
| Resource resource = Resource.newInstance(1024, 1); |
| Priority priority = Priority.newInstance(5); |
| String [] host1 = {"host1"}; |
| String [] host2 = {"host2"}; |
| |
| String [] defaultRack = {"/default-rack"}; |
| |
| TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance( |
| TezTaskID.getInstance(vertexID, 1), 1); |
| TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance( |
| TezTaskID.getInstance(vertexID, 2), 1); |
| TezTaskAttemptID taID31 = TezTaskAttemptID.getInstance( |
| TezTaskID.getInstance(vertexID, 3), 1); |
| TaskAttempt ta11 = mock(TaskAttempt.class); |
| TaskAttempt ta21 = mock(TaskAttempt.class); |
| TaskAttempt ta31 = mock(TaskAttempt.class); |
| |
| AMSchedulerEventTALaunchRequest lrTa11 = |
| createLaunchRequestEvent(taID11, ta11, resource, host1, |
| defaultRack, priority); |
| AMSchedulerEventTALaunchRequest lrTa21 = |
| createLaunchRequestEvent(taID21, ta21, resource, host2, |
| defaultRack, priority); |
| AMSchedulerEventTALaunchRequest lrTa31 = |
| createLaunchRequestEvent(taID31, ta31, resource, host1, |
| defaultRack, priority); |
| |
| taskSchedulerManager.handleEvent(lrTa11); |
| taskSchedulerManager.handleEvent(lrTa21); |
| |
| Container containerHost1 = createContainer(1, host1[0], resource, priority); |
| Container containerHost2 = createContainer(2, host2[0], resource, priority); |
| |
| taskScheduler.onContainersAllocated( |
| Lists.newArrayList(containerHost1, containerHost2)); |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated( |
| eq(0), eq(ta11), any(Object.class), eq(containerHost1)); |
| verify(taskSchedulerManager).taskAllocated( |
| eq(0), eq(ta21), any(Object.class), eq(containerHost2)); |
| |
| // Adding the event later so that task1 assigned to containerHost1 |
| // is deterministic. |
| taskSchedulerManager.handleEvent(lrTa31); |
| |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded( |
| ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta11, true, null, null); |
| verify(taskSchedulerManager, times(1)).taskAllocated( |
| eq(0), eq(ta31), any(Object.class), eq(containerHost1)); |
| verify(rmClient, times(0)).releaseAssignedContainer( |
| eq(containerHost1.getId())); |
| eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); |
| eventHandler.reset(); |
| |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), |
| TaskAttemptState.SUCCEEDED, null, null, 0)); |
| |
| long currentTs = System.currentTimeMillis(); |
| Throwable exception = null; |
| while (System.currentTimeMillis() < currentTs + 5000l) { |
| try { |
| verify(taskSchedulerManager, |
| times(1)).containerBeingReleased(eq(0), eq(containerHost2.getId())); |
| exception = null; |
| break; |
| } catch (Throwable e) { |
| exception = e; |
| } |
| } |
| assertTrue("containerHost2 was not released", exception == null); |
| taskScheduler.shutdown(); |
| taskSchedulerManager.close(); |
| } |
| |
| @Test(timeout = 15000l) |
| public void testDelayedReuseContainerNotAvailable() |
| throws IOException, InterruptedException, ExecutionException { |
| LOG.info("Test testDelayedReuseContainerNotAvailable"); |
| Configuration conf = new Configuration(new YarnConfiguration()); |
| conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); |
| conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, false); |
| conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, false); |
| conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 1000l); |
| conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); |
| conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); |
| |
| CapturingEventHandler eventHandler = new CapturingEventHandler(); |
| TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| |
| AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); |
| TezAMRMClientAsync<CookieContainerRequest> rmClient = |
| spy(new AMRMClientAsyncForTest(rmClientCore, 100)); |
| |
| AppContext appContext = mock(AppContext.class); |
| doReturn(new Configuration(false)).when(appContext).getAMConf(); |
| AMContainerMap amContainerMap = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), |
| mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appContext); |
| AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); |
| doReturn(amContainerMap).when(appContext).getAllContainers(); |
| doReturn(amNodeTracker).when(appContext).getNodeTracker(); |
| doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); |
| doReturn(dagID).when(appContext).getCurrentDAGID(); |
| doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); |
| |
| TaskSchedulerManager taskSchedulerManagerReal = |
| new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, |
| new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(conf)); |
| TaskSchedulerManager taskSchedulerManager = |
| spy(taskSchedulerManagerReal); |
| taskSchedulerManager.init(conf); |
| taskSchedulerManager.start(); |
| |
| TaskSchedulerWithDrainableContext taskScheduler = |
| (TaskSchedulerWithDrainableContext) |
| ((TaskSchedulerManagerForTest) taskSchedulerManager) |
| .getSpyTaskScheduler(); |
| TaskSchedulerContextDrainable drainableAppCallback = |
| taskScheduler.getDrainableAppCallback(); |
| |
| AtomicBoolean drainNotifier = new AtomicBoolean(false); |
| taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; |
| |
| Resource resource = Resource.newInstance(1024, 1); |
| Priority priority = Priority.newInstance(5); |
| String [] host1 = {"host1"}; |
| String [] host2 = {"host2"}; |
| |
| String [] defaultRack = {"/default-rack"}; |
| |
| TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID, 1), 1); |
| TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID, 2), 1); |
| TezTaskAttemptID taID31 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID, 3), 1); |
| TaskAttempt ta11 = mock(TaskAttempt.class); |
| TaskAttempt ta21 = mock(TaskAttempt.class); |
| TaskAttempt ta31 = mock(TaskAttempt.class); |
| |
| AMSchedulerEventTALaunchRequest lrTa11 = createLaunchRequestEvent( |
| taID11, ta11, resource, host1, defaultRack, priority); |
| AMSchedulerEventTALaunchRequest lrTa21 = createLaunchRequestEvent( |
| taID21, ta21, resource, host2, defaultRack, priority); |
| AMSchedulerEventTALaunchRequest lrTa31 = createLaunchRequestEvent( |
| taID31, ta31, resource, host1, defaultRack, priority); |
| |
| taskSchedulerManager.handleEvent(lrTa11); |
| taskSchedulerManager.handleEvent(lrTa21); |
| |
| Container containerHost1 = createContainer(1, host1[0], resource, priority); |
| Container containerHost2 = createContainer(2, host2[0], resource, priority); |
| |
| taskScheduler.onContainersAllocated(Lists.newArrayList(containerHost1, containerHost2)); |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(containerHost1)); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta21), any(Object.class), |
| eq(containerHost2)); |
| |
| // Adding the event later so that task1 assigned to containerHost1 is deterministic. |
| taskSchedulerManager.handleEvent(lrTa31); |
| |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), |
| TaskAttemptState.SUCCEEDED, null, null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta21, true, null, null); |
| verify(taskSchedulerManager, times(0)).taskAllocated( |
| eq(0), eq(ta31), any(Object.class), eq(containerHost2)); |
| verify(rmClient, times(1)).releaseAssignedContainer( |
| eq(containerHost2.getId())); |
| eventHandler.verifyInvocation(AMContainerEventStopRequest.class); |
| |
| taskScheduler.shutdown(); |
| taskSchedulerManager.close(); |
| } |
| |
| @Test(timeout = 10000l) |
| public void testSimpleReuse() throws IOException, InterruptedException, ExecutionException { |
| LOG.info("Test testSimpleReuse"); |
| Configuration tezConf = new Configuration(new YarnConfiguration()); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); |
| |
| CapturingEventHandler eventHandler = new CapturingEventHandler(); |
| TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); |
| |
| AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); |
| TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); |
| |
| AppContext appContext = mock(AppContext.class); |
| doReturn(new Configuration(false)).when(appContext).getAMConf(); |
| AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), |
| mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); |
| AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); |
| doReturn(amContainerMap).when(appContext).getAllContainers(); |
| doReturn(amNodeTracker).when(appContext).getNodeTracker(); |
| doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); |
| doReturn(dagID).when(appContext).getCurrentDAGID(); |
| doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); |
| |
| TaskSchedulerManager |
| taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); |
| TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal); |
| taskSchedulerManager.init(tezConf); |
| taskSchedulerManager.start(); |
| |
| TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager) |
| .getSpyTaskScheduler(); |
| TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); |
| AtomicBoolean drainNotifier = new AtomicBoolean(false); |
| taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; |
| |
| Resource resource1 = Resource.newInstance(1024, 1); |
| String[] host1 = {"host1"}; |
| String[] host2 = {"host2"}; |
| |
| String []racks = {"/default-rack"}; |
| Priority priority1 = Priority.newInstance(1); |
| |
| TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1); |
| |
| //Vertex 1, Task 1, Attempt 1, host1 |
| TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 1), 1); |
| TaskAttempt ta11 = mock(TaskAttempt.class); |
| AMSchedulerEventTALaunchRequest lrEvent1 = createLaunchRequestEvent(taID11, ta11, resource1, host1, racks, priority1); |
| |
| //Vertex 1, Task 2, Attempt 1, host1 |
| TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 2), 1); |
| TaskAttempt ta12 = mock(TaskAttempt.class); |
| AMSchedulerEventTALaunchRequest lrEvent2 = createLaunchRequestEvent(taID12, ta12, resource1, host1, racks, priority1); |
| |
| //Vertex 1, Task 3, Attempt 1, host2 |
| TezTaskAttemptID taID13 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 3), 1); |
| TaskAttempt ta13 = mock(TaskAttempt.class); |
| AMSchedulerEventTALaunchRequest lrEvent3 = createLaunchRequestEvent(taID13, ta13, resource1, host2, racks, priority1); |
| |
| //Vertex 1, Task 4, Attempt 1, host2 |
| TezTaskAttemptID taID14 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 4), 1); |
| TaskAttempt ta14 = mock(TaskAttempt.class); |
| AMSchedulerEventTALaunchRequest lrEvent4 = createLaunchRequestEvent(taID14, ta14, resource1, host2, racks, priority1); |
| |
| taskSchedulerManager.handleEvent(lrEvent1); |
| taskSchedulerManager.handleEvent(lrEvent2); |
| taskSchedulerManager.handleEvent(lrEvent3); |
| taskSchedulerManager.handleEvent(lrEvent4); |
| |
| Container container1 = createContainer(1, "host1", resource1, priority1); |
| |
| // One container allocated. |
| taskScheduler.onContainersAllocated(Collections.singletonList(container1)); |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), |
| eq(container1)); |
| |
| // Task assigned to container completed successfully. Container should be re-used. |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, |
| null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta11, true, null, null); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1)); |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); |
| eventHandler.reset(); |
| |
| // Task assigned to container completed successfully. |
| // Verify reuse across hosts. |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null, |
| null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta12, true, null, null); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), |
| eq(container1)); |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); |
| eventHandler.reset(); |
| |
| // Verify no re-use if a previous task fails. |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null, |
| "TIMEOUT", 0)); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), |
| eq(container1)); |
| verifyDeAllocateTask(taskScheduler, ta13, false, null, "TIMEOUT"); |
| verify(rmClient).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyInvocation(AMContainerEventStopRequest.class); |
| eventHandler.reset(); |
| |
| Container container2 = createContainer(2, "host2", resource1, priority1); |
| |
| // Second container allocated. Should be allocated to the last task. |
| taskScheduler.onContainersAllocated(Collections.singletonList(container2)); |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta14), any(Object.class), |
| eq(container2)); |
| |
| // Task assigned to container completed successfully. No pending requests. Container should be released. |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null, |
| null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta14, true, null, null); |
| verify(rmClient).releaseAssignedContainer(eq(container2.getId())); |
| eventHandler.verifyInvocation(AMContainerEventStopRequest.class); |
| eventHandler.reset(); |
| |
| taskScheduler.shutdown(); |
| taskSchedulerManager.close(); |
| } |
| |
| @Test(timeout = 10000l) |
| public void testReuseWithTaskSpecificLaunchCmdOption() throws IOException, InterruptedException, ExecutionException { |
| LOG.info("Test testReuseWithTaskSpecificLaunchCmdOption"); |
| Configuration tezConf = new Configuration(new YarnConfiguration()); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); |
| //Profile 3 tasks |
| tezConf.set(TezConfiguration.TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS_LIST, "v1[1,3,4]"); |
| tezConf.set(TezConfiguration.TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS, "dir=/tmp/__VERTEX_NAME__/__TASK_INDEX__"); |
| TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(tezConf); |
| |
| CapturingEventHandler eventHandler = new CapturingEventHandler(); |
| TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); |
| |
| AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); |
| TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); |
| |
| AppContext appContext = mock(AppContext.class); |
| doReturn(new Configuration(false)).when(appContext).getAMConf(); |
| AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), |
| mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); |
| AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); |
| doReturn(amContainerMap).when(appContext).getAllContainers(); |
| doReturn(amNodeTracker).when(appContext).getNodeTracker(); |
| doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); |
| doReturn(dagID).when(appContext).getCurrentDAGID(); |
| doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); |
| |
| //Use ContainerContextMatcher here. Otherwise it would not match the JVM options |
| TaskSchedulerManager taskSchedulerManagerReal = |
| new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); |
| TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal); |
| taskSchedulerManager.init(tezConf); |
| taskSchedulerManager.start(); |
| |
| TaskSchedulerWithDrainableContext taskScheduler = |
| (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager) |
| .getSpyTaskScheduler(); |
| TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); |
| AtomicBoolean drainNotifier = new AtomicBoolean(false); |
| taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; |
| |
| Resource resource1 = Resource.newInstance(1024, 1); |
| String[] host1 = {"host1"}; |
| String[] host2 = {"host2"}; |
| String[] host3 = {"host3"}; |
| |
| String []racks = {"/default-rack"}; |
| Priority priority1 = Priority.newInstance(1); |
| |
| TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1); |
| Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); |
| String tsLaunchCmdOpts = taskSpecificLaunchCmdOption.getTaskSpecificOption("", "v1", 1); |
| |
| /** |
| * Schedule 2 tasks (1 with additional launch-cmd option and another in normal mode). |
| * Container should not be reused in this case. |
| */ |
| //Vertex 1, Task 1, Attempt 1, host1 |
| TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 1), 1); |
| TaskAttempt ta11 = mock(TaskAttempt.class); |
| AMSchedulerEventTALaunchRequest lrEvent1 = |
| createLaunchRequestEvent(taID11, ta11, resource1, host1, racks, |
| priority1, localResources, tsLaunchCmdOpts); |
| |
| //Vertex 1, Task 2, Attempt 1, host1 |
| TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 2), 1); |
| TaskAttempt ta12 = mock(TaskAttempt.class); |
| AMSchedulerEventTALaunchRequest lrEvent2 = |
| createLaunchRequestEvent(taID12, ta12, resource1, host1, racks, priority1); |
| |
| taskSchedulerManager.handleEvent(lrEvent1); |
| taskSchedulerManager.handleEvent(lrEvent2); |
| |
| Container container1 = createContainer(1, "host1", resource1, priority1); |
| |
| // One container allocated. |
| taskScheduler.onContainersAllocated(Collections.singletonList(container1)); |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), |
| eq(container1)); |
| |
| // First task had profiling on. This container can not be reused further. |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null, |
| null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta11, true, null, null); |
| verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta12), any(Object.class), |
| eq(container1)); |
| verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyInvocation(AMContainerEventStopRequest.class); |
| eventHandler.reset(); |
| |
| /** |
| * Schedule 2 tasks (both having different task specific JVM option). |
| * Container should not be reused. |
| */ |
| //Vertex 1, Task 3, Attempt 1, host2 |
| tsLaunchCmdOpts = taskSpecificLaunchCmdOption.getTaskSpecificOption("", "v1", 3); |
| TezTaskAttemptID taID13 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 3), 1); |
| TaskAttempt ta13 = mock(TaskAttempt.class); |
| AMSchedulerEventTALaunchRequest lrEvent3 = |
| createLaunchRequestEvent(taID13, ta13, resource1, host2, racks, |
| priority1, localResources, tsLaunchCmdOpts); |
| |
| //Vertex 1, Task 4, Attempt 1, host2 |
| tsLaunchCmdOpts = taskSpecificLaunchCmdOption.getTaskSpecificOption("", "v1", 4); |
| TezTaskAttemptID taID14 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 4), 1); |
| TaskAttempt ta14 = mock(TaskAttempt.class); |
| AMSchedulerEventTALaunchRequest lrEvent4 = |
| createLaunchRequestEvent(taID14, ta14, resource1, host2, racks, |
| priority1, localResources, tsLaunchCmdOpts); |
| |
| taskSchedulerManager.handleEvent(lrEvent3); |
| taskSchedulerManager.handleEvent(lrEvent4); |
| |
| // Container started |
| Container container2 = createContainer(2, "host2", resource1, priority1); |
| taskScheduler.onContainersAllocated(Collections.singletonList(container2)); |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2)); |
| |
| // Verify that the container can not be reused when profiling option is turned on |
| // Even for 2 tasks having same profiling option can have container reusability. |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null, |
| null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta13, true, null, null); |
| verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class), |
| eq(container2)); |
| verify(rmClient, times(1)).releaseAssignedContainer(eq(container2.getId())); |
| eventHandler.verifyInvocation(AMContainerEventStopRequest.class); |
| eventHandler.reset(); |
| |
| /** |
| * Schedule 2 tasks with same jvm profiling option. |
| * Container should be reused. |
| */ |
| tezConf.set(TezConfiguration.TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS_LIST, "v1[1,2,3,5,6]"); |
| tezConf.set(TezConfiguration.TEZ_TASK_SPECIFIC_LAUNCH_CMD_OPTS, "dummyOpts"); |
| taskSpecificLaunchCmdOption = new TaskSpecificLaunchCmdOption(tezConf); |
| |
| //Vertex 1, Task 5, Attempt 1, host3 |
| TezTaskAttemptID taID15 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 3), 1); |
| TaskAttempt ta15 = mock(TaskAttempt.class); |
| AMSchedulerEventTALaunchRequest lrEvent5 = |
| createLaunchRequestEvent(taID15, ta15, resource1, host3, racks, |
| priority1, localResources, |
| taskSpecificLaunchCmdOption.getTaskSpecificOption("", "v1", 5)); |
| |
| //Vertex 1, Task 6, Attempt 1, host3 |
| tsLaunchCmdOpts = taskSpecificLaunchCmdOption.getTaskSpecificOption("", "v1", 4); |
| TezTaskAttemptID taID16 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 4), 1); |
| TaskAttempt ta16 = mock(TaskAttempt.class); |
| AMSchedulerEventTALaunchRequest lrEvent6 = |
| createLaunchRequestEvent(taID16, ta16, resource1, host3, racks, |
| priority1, localResources, taskSpecificLaunchCmdOption.getTaskSpecificOption("", "v1", 6)); |
| |
| // Container started |
| Container container3 = createContainer(3, "host3", resource1, priority1); |
| taskSchedulerManager.handleEvent(lrEvent5); |
| taskSchedulerManager.handleEvent(lrEvent6); |
| |
| taskScheduler.onContainersAllocated(Collections.singletonList(container3)); |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta15), any(Object.class), |
| eq(container3)); |
| |
| //Ensure task 6 (of vertex 1) is allocated to same container |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null, |
| null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta15, true, null, null); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3)); |
| eventHandler.reset(); |
| |
| taskScheduler.shutdown(); |
| taskSchedulerManager.close(); |
| } |
| |
| @Test(timeout = 30000l) |
| public void testReuseNonLocalRequest() |
| throws IOException, InterruptedException, ExecutionException { |
| LOG.info("Test testReuseNonLocalRequest"); |
| Configuration tezConf = new Configuration(new YarnConfiguration()); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, true); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 100l); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 1000l); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 1000l); |
| |
| CapturingEventHandler eventHandler = new CapturingEventHandler(); |
| TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); |
| |
| AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); |
| TezAMRMClientAsync<CookieContainerRequest> rmClient = |
| spy(new AMRMClientAsyncForTest(rmClientCore, 100)); |
| |
| AppContext appContext = mock(AppContext.class); |
| doReturn(new Configuration(false)).when(appContext).getAMConf(); |
| AMContainerMap amContainerMap = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), |
| mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appContext); |
| AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); |
| doReturn(amContainerMap).when(appContext).getAllContainers(); |
| doReturn(amNodeTracker).when(appContext).getNodeTracker(); |
| doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); |
| doReturn(dagID).when(appContext).getCurrentDAGID(); |
| doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); |
| |
| TaskSchedulerManager taskSchedulerManagerReal = |
| new TaskSchedulerManagerForTest( |
| appContext, eventHandler, rmClient, |
| new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); |
| TaskSchedulerManager taskSchedulerManager = |
| spy(taskSchedulerManagerReal); |
| taskSchedulerManager.init(tezConf); |
| taskSchedulerManager.start(); |
| |
| TaskSchedulerWithDrainableContext taskScheduler = |
| (TaskSchedulerWithDrainableContext) |
| ((TaskSchedulerManagerForTest) taskSchedulerManager) |
| .getSpyTaskScheduler(); |
| TaskSchedulerContextDrainable drainableAppCallback = |
| taskScheduler.getDrainableAppCallback(); |
| AtomicBoolean drainNotifier = new AtomicBoolean(false); |
| taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; |
| |
| Resource resource1 = Resource.newInstance(1024, 1); |
| String [] emptyHosts = new String[0]; |
| String [] racks = { "default-rack" }; |
| |
| Priority priority = Priority.newInstance(3); |
| |
| TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); |
| |
| //Vertex 1, Task 1, Attempt 1, no locality information. |
| TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID, 1), 1); |
| TaskAttempt ta11 = mock(TaskAttempt.class); |
| doReturn(vertexID).when(ta11).getVertexID(); |
| AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent( |
| taID11, ta11, resource1, emptyHosts, racks, priority); |
| |
| //Vertex1, Task2, Attempt 1, no locality information. |
| TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID, 2), 1); |
| TaskAttempt ta12 = mock(TaskAttempt.class); |
| doReturn(vertexID).when(ta12).getVertexID(); |
| AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent( |
| taID12, ta12, resource1, emptyHosts, racks, priority); |
| |
| // Send launch request for task 1 only, deterministic assignment to this task. |
| taskSchedulerManager.handleEvent(lrEvent11); |
| |
| Container container1 = createContainer(1, "randomHost", resource1, priority); |
| |
| // One container allocated. |
| taskScheduler.onContainersAllocated(Collections.singletonList(container1)); |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated( |
| eq(0), eq(ta11), any(Object.class), eq(container1)); |
| |
| // Send launch request for task2 (vertex2) |
| taskSchedulerManager.handleEvent(lrEvent12); |
| |
| // Task assigned to container completed successfully. |
| // Container should not be immediately assigned to task 2 |
| // until delay expires. |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta11, container1.getId(), |
| TaskAttemptState.SUCCEEDED, null, null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta11, true, null, null); |
| verify(taskSchedulerManager, times(0)).taskAllocated( |
| eq(0), eq(ta12), any(Object.class), eq(container1)); |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); |
| eventHandler.reset(); |
| |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated( |
| eq(0), eq(ta12), any(Object.class), eq(container1)); |
| |
| // TA12 completed. |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta12, container1.getId(), |
| TaskAttemptState.SUCCEEDED, null, null, 0)); |
| |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| |
| verify(rmClient).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyInvocation(AMContainerEventStopRequest.class); |
| |
| taskScheduler.shutdown(); |
| taskSchedulerManager.close(); |
| } |
| |
| @Test(timeout = 30000l) |
| public void testReuseAcrossVertices() |
| throws IOException, InterruptedException, ExecutionException { |
| LOG.info("Test testReuseAcrossVertices"); |
| Configuration tezConf = new Configuration(new YarnConfiguration()); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); |
| tezConf.setLong( |
| TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 1l); |
| tezConf.setLong( |
| TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 20l); |
| tezConf.setLong( |
| TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 30l); |
| tezConf.setInt( |
| TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, 1); |
| |
| CapturingEventHandler eventHandler = new CapturingEventHandler(); |
| TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); |
| |
| AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); |
| TezAMRMClientAsync<CookieContainerRequest> rmClient = |
| spy(new AMRMClientAsyncForTest(rmClientCore, 100)); |
| |
| AppContext appContext = mock(AppContext.class); |
| doReturn(new Configuration(false)).when(appContext).getAMConf(); |
| AMContainerMap amContainerMap = new AMContainerMap( |
| mock(ContainerHeartbeatHandler.class), |
| mock(TaskCommunicatorManagerInterface.class), |
| new ContainerContextMatcher(), appContext); |
| AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); |
| doReturn(amContainerMap).when(appContext).getAllContainers(); |
| doReturn(amNodeTracker).when(appContext).getNodeTracker(); |
| doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); |
| doReturn(true).when(appContext).isSession(); |
| doReturn(dagID).when(appContext).getCurrentDAGID(); |
| doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); |
| |
| TaskSchedulerManager taskSchedulerManagerReal = |
| new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, |
| new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); |
| TaskSchedulerManager taskSchedulerManager = |
| spy(taskSchedulerManagerReal); |
| taskSchedulerManager.init(tezConf); |
| taskSchedulerManager.start(); |
| |
| TaskSchedulerWithDrainableContext taskScheduler = |
| (TaskSchedulerWithDrainableContext) |
| ((TaskSchedulerManagerForTest) taskSchedulerManager) |
| .getSpyTaskScheduler(); |
| TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); |
| |
| AtomicBoolean drainNotifier = new AtomicBoolean(false); |
| taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; |
| |
| Resource resource1 = Resource.newInstance(1024, 1); |
| String[] host1 = {"host1"}; |
| |
| String []racks = {"/default-rack"}; |
| Priority priority1 = Priority.newInstance(3); |
| Priority priority2 = Priority.newInstance(4); |
| |
| TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1); |
| TezVertexID vertexID2 = TezVertexID.getInstance(dagID, 2); |
| |
| //Vertex 1, Task 1, Attempt 1, host1 |
| TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance( |
| TezTaskID.getInstance(vertexID1, 1), 1); |
| TaskAttempt ta11 = mock(TaskAttempt.class); |
| doReturn(vertexID1).when(ta11).getVertexID(); |
| AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent( |
| taID11, ta11, resource1, host1, racks, priority1); |
| |
| //Vertex2, Task1, Attempt 1, host1 |
| TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance( |
| TezTaskID.getInstance(vertexID2, 1), 1); |
| TaskAttempt ta21 = mock(TaskAttempt.class); |
| doReturn(vertexID2).when(ta21).getVertexID(); |
| AMSchedulerEventTALaunchRequest lrEvent21 = createLaunchRequestEvent( |
| taID21, ta21, resource1, host1, racks, priority2); |
| |
| // Send launch request for task 1 onle, deterministic assignment to this task. |
| taskSchedulerManager.handleEvent(lrEvent11); |
| |
| Container container1 = createContainer(1, host1[0], resource1, priority1); |
| |
| // One container allocated. |
| taskScheduler.onContainersAllocated(Collections.singletonList(container1)); |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated( |
| eq(0), eq(ta11), any(Object.class), eq(container1)); |
| |
| // Send launch request for task2 (vertex2) |
| taskSchedulerManager.handleEvent(lrEvent21); |
| |
| // Task assigned to container completed successfully. |
| // Container should be assigned to task21. |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta11, container1.getId(), |
| TaskAttemptState.SUCCEEDED, null, null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta11, true, null, null); |
| verify(taskSchedulerManager).taskAllocated( |
| eq(0), eq(ta21), any(Object.class), eq(container1)); |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| |
| // Task 2 completes. |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta21, container1.getId(), |
| TaskAttemptState.SUCCEEDED, null, null, 0)); |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| |
| LOG.info("Sleeping to ensure that the container has been idled longer " + |
| "than TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS "); |
| Thread.sleep(50l); |
| // container should not get released due to min held containers |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| |
| taskScheduler.shutdown(); |
| taskSchedulerManager.close(); |
| } |
| |
| @Test(timeout = 30000l) |
| public void testReuseLocalResourcesChanged() throws IOException, InterruptedException, ExecutionException { |
| LOG.info("Test testReuseLocalResourcesChanged"); |
| Configuration tezConf = new Configuration(new YarnConfiguration()); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, true); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, -1); |
| |
| CapturingEventHandler eventHandler = new CapturingEventHandler(); |
| TezDAGID dagID1 = TezDAGID.getInstance("0", 1, 0); |
| |
| AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); |
| TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); |
| |
| AppContext appContext = mock(AppContext.class); |
| doReturn(new Configuration(false)).when(appContext).getAMConf(); |
| ChangingDAGIDAnswer dagIDAnswer = new ChangingDAGIDAnswer(dagID1); |
| AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), |
| mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); |
| AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); |
| doReturn(amContainerMap).when(appContext).getAllContainers(); |
| doReturn(amNodeTracker).when(appContext).getNodeTracker(); |
| doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); |
| doReturn(true).when(appContext).isSession(); |
| doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID(); |
| doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); |
| |
| TaskSchedulerManager |
| taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); |
| TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal); |
| taskSchedulerManager.init(tezConf); |
| taskSchedulerManager.start(); |
| |
| TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager) |
| .getSpyTaskScheduler(); |
| TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); |
| AtomicBoolean drainNotifier = new AtomicBoolean(false); |
| taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; |
| |
| Resource resource1 = Resource.newInstance(1024, 1); |
| String[] host1 = {"host1"}; |
| |
| String []racks = {"/default-rack"}; |
| Priority priority1 = Priority.newInstance(1); |
| |
| String rsrc1 = "rsrc1"; |
| String rsrc2 = "rsrc2"; |
| String rsrc3 = "rsrc3"; |
| LocalResource lr1 = mock(LocalResource.class); |
| LocalResource lr2 = mock(LocalResource.class); |
| LocalResource lr3 = mock(LocalResource.class); |
| |
| AMContainerEventAssignTA assignEvent = null; |
| |
| Map<String, LocalResource> dag1LRs = Maps.newHashMap(); |
| dag1LRs.put(rsrc1, lr1); |
| |
| TezVertexID vertexID11 = TezVertexID.getInstance(dagID1, 1); |
| |
| //Vertex 1, Task 1, Attempt 1, host1, lr1 |
| TezTaskAttemptID taID111 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 1), 1); |
| TaskAttempt ta111 = mock(TaskAttempt.class); |
| AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(taID111, ta111, resource1, host1, racks, priority1, dag1LRs); |
| |
| //Vertex 1, Task 2, Attempt 1, host1, lr1 |
| TezTaskAttemptID taID112 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 2), 1); |
| TaskAttempt ta112 = mock(TaskAttempt.class); |
| AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, dag1LRs); |
| |
| taskSchedulerManager.handleEvent(lrEvent11); |
| taskSchedulerManager.handleEvent(lrEvent12); |
| |
| Container container1 = createContainer(1, "host1", resource1, priority1); |
| |
| // One container allocated. |
| taskScheduler.onContainersAllocated(Collections.singletonList(container1)); |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1)); |
| assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); |
| assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); |
| |
| // Task assigned to container completed successfully. Container should be re-used. |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, |
| null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta111, true, null, null); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1)); |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); |
| assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); |
| assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); |
| eventHandler.reset(); |
| |
| // Task assigned to container completed successfully. |
| // Verify reuse across hosts. |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, |
| null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta112, true, null, null); |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); |
| eventHandler.reset(); |
| |
| // Setup DAG2 with additional resources. Make sure the container, even without all resources, is reused. |
| TezDAGID dagID2 = TezDAGID.getInstance("0", 2, 0); |
| dagIDAnswer.setDAGID(dagID2); |
| |
| Map<String, LocalResource> dag2LRs = Maps.newHashMap(); |
| dag2LRs.put(rsrc2, lr2); |
| dag2LRs.put(rsrc3, lr3); |
| |
| TezVertexID vertexID21 = TezVertexID.getInstance(dagID2, 1); |
| |
| //Vertex 2, Task 1, Attempt 1, host1, lr2 |
| TezTaskAttemptID taID211 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID21, 1), 1); |
| TaskAttempt ta211 = mock(TaskAttempt.class); |
| AMSchedulerEventTALaunchRequest lrEvent21 = createLaunchRequestEvent(taID211, ta211, resource1, host1, racks, priority1, dag2LRs); |
| |
| //Vertex 2, Task 2, Attempt 1, host1, lr2 |
| TezTaskAttemptID taID212 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID21, 2), 1); |
| TaskAttempt ta212 = mock(TaskAttempt.class); |
| AMSchedulerEventTALaunchRequest lrEvent22 = createLaunchRequestEvent(taID212, ta212, resource1, host1, racks, priority1, dag2LRs); |
| |
| taskSchedulerManager.handleEvent(lrEvent21); |
| taskSchedulerManager.handleEvent(lrEvent22); |
| |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), |
| eq(container1)); |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); |
| assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); |
| assertEquals(2, assignEvent.getRemoteTaskLocalResources().size()); |
| eventHandler.reset(); |
| |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null, |
| null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta211, true, null, null); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta212), any(Object.class), eq(container1)); |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); |
| assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); |
| assertEquals(2, assignEvent.getRemoteTaskLocalResources().size()); |
| eventHandler.reset(); |
| |
| taskScheduler.shutdown(); |
| taskSchedulerManager.close(); |
| } |
| |
| @Test(timeout = 30000l) |
| public void testReuseConflictLocalResources() throws IOException, InterruptedException, ExecutionException { |
| LOG.info("Test testReuseLocalResourcesChanged"); |
| Configuration tezConf = new Configuration(new YarnConfiguration()); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED, true); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, -1); |
| |
| CapturingEventHandler eventHandler = new CapturingEventHandler(); |
| TezDAGID dagID1 = TezDAGID.getInstance("0", 1, 0); |
| |
| AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); |
| TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); |
| |
| AppContext appContext = mock(AppContext.class); |
| doReturn(new Configuration(false)).when(appContext).getAMConf(); |
| ChangingDAGIDAnswer dagIDAnswer = new ChangingDAGIDAnswer(dagID1); |
| AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), |
| mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); |
| AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); |
| doReturn(amContainerMap).when(appContext).getAllContainers(); |
| doReturn(amNodeTracker).when(appContext).getNodeTracker(); |
| doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState(); |
| doReturn(true).when(appContext).isSession(); |
| doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID(); |
| doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); |
| |
| TaskSchedulerManager taskSchedulerManagerReal = |
| new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, |
| new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); |
| TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal); |
| taskSchedulerManager.init(tezConf); |
| taskSchedulerManager.start(); |
| |
| TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager) |
| .getSpyTaskScheduler(); |
| TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); |
| AtomicBoolean drainNotifier = new AtomicBoolean(false); |
| taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; |
| |
| Resource resource1 = Resource.newInstance(1024, 1); |
| String[] host1 = {"host1"}; |
| |
| String []racks = {"/default-rack"}; |
| Priority priority1 = Priority.newInstance(1); |
| |
| String rsrc1 = "rsrc1"; |
| String rsrc2 = "rsrc2"; |
| LocalResource lr1 = mock(LocalResource.class); |
| LocalResource lr2 = mock(LocalResource.class); |
| LocalResource lr3 = mock(LocalResource.class); |
| |
| AMContainerEventAssignTA assignEvent = null; |
| |
| Map<String, LocalResource> v11LR = Maps.newHashMap(); |
| v11LR.put(rsrc1, lr1); |
| |
| TezVertexID vertexID11 = TezVertexID.getInstance(dagID1, 1); |
| |
| //Vertex 1, Task 1, Attempt 1, host1, lr1 |
| TezTaskAttemptID taID111 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 1), 1); |
| TaskAttempt ta111 = mock(TaskAttempt.class); |
| doReturn(taID111).when(ta111).getID(); |
| doReturn("Mock for TA " + taID111.toString()).when(ta111).toString(); |
| AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent( |
| taID111, ta111, resource1, host1, racks, priority1, v11LR); |
| |
| Map<String, LocalResource> v12LR = Maps.newHashMap(); |
| v12LR.put(rsrc1, lr1); |
| v12LR.put(rsrc2, lr2); |
| |
| //Vertex 1, Task 2, Attempt 1, host1, lr1 |
| TezTaskAttemptID taID112 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 2), 1); |
| TaskAttempt ta112 = mock(TaskAttempt.class); |
| doReturn(taID112).when(ta112).getID(); |
| doReturn("Mock for TA " + taID112.toString()).when(ta112).toString(); |
| AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent( |
| taID112, ta112, resource1, host1, racks, priority1, v12LR); |
| |
| //Vertex 1, Task 3, Attempt 1, host1 |
| TezTaskAttemptID taID113 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 3), 1); |
| TaskAttempt ta113 = mock(TaskAttempt.class); |
| doReturn(taID113).when(ta113).getID(); |
| doReturn("Mock for TA " + taID113.toString()).when(ta113).toString(); |
| AMSchedulerEventTALaunchRequest lrEvent13 = createLaunchRequestEvent( |
| taID113, ta113, resource1, host1, racks, priority1, new HashMap<String, LocalResource>()); |
| //Vertex 1, Task 4, Attempt 1, host1 |
| TezTaskAttemptID taID114 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID11, 4), 1); |
| TaskAttempt ta114 = mock(TaskAttempt.class); |
| doReturn(taID114).when(ta114).getID(); |
| doReturn("Mock for TA " + taID114.toString()).when(ta114).toString(); |
| AMSchedulerEventTALaunchRequest lrEvent14 = createLaunchRequestEvent( |
| taID114, ta114, resource1, host1, racks, priority1, new HashMap<String, LocalResource>()); |
| |
| |
| taskSchedulerManager.handleEvent(lrEvent11); |
| taskSchedulerManager.handleEvent(lrEvent12); |
| |
| Container container1 = createContainer(1, "host1", resource1, priority1); |
| Container container2 = createContainer(2, "host1", resource1, priority1); |
| |
| // One container allocated. |
| taskScheduler.onContainersAllocated(Collections.singletonList(container1)); |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1)); |
| assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); |
| assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); |
| |
| // Task assigned to container completed successfully. Container should be re-used. |
| taskSchedulerManager.handleEvent( |
| new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null, |
| null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta111, true, null, null); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1)); |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); |
| assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class); |
| assertEquals(1, assignEvent.getRemoteTaskLocalResources().size()); |
| eventHandler.reset(); |
| |
| // Task assigned to container completed successfully. |
| // Verify reuse across hosts. |
| taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), |
| TaskAttemptState.SUCCEEDED, null, null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta112, true, null, null); |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); |
| eventHandler.reset(); |
| |
| // Task 3 |
| taskSchedulerManager.handleEvent(lrEvent13); |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta113), any(Object.class), eq(container1)); |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); |
| eventHandler.reset(); |
| |
| taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta113, container1.getId(), |
| TaskAttemptState.SUCCEEDED, null, null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta113, true, null, null); |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); |
| eventHandler.reset(); |
| |
| // Task 4 |
| taskSchedulerManager.handleEvent(lrEvent14); |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta114), any(Object.class), eq(container1)); |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); |
| eventHandler.reset(); |
| |
| taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta114, container1.getId(), |
| TaskAttemptState.SUCCEEDED, null, null, 0)); |
| drainableAppCallback.drain(); |
| verifyDeAllocateTask(taskScheduler, ta114, true, null, null); |
| verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId())); |
| eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class); |
| eventHandler.reset(); |
| |
| |
| // Setup DAG2 with different resources. |
| TezDAGID dagID2 = TezDAGID.getInstance("0", 2, 0); |
| dagIDAnswer.setDAGID(dagID2); |
| |
| Map<String, LocalResource> v21LR = Maps.newHashMap(); |
| v21LR.put(rsrc1, lr1); |
| v21LR.put(rsrc2, lr3); |
| |
| TezVertexID vertexID21 = TezVertexID.getInstance(dagID2, 1); |
| |
| //Vertex 2, Task 1, Attempt 1, host1, lr2 |
| TezTaskAttemptID taID211 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID21, 1), 1); |
| TaskAttempt ta211 = mock(TaskAttempt.class); |
| doReturn(taID211).when(ta211).getID(); |
| doReturn("Mock for TA " + taID211.toString()).when(ta211).toString(); |
| AMSchedulerEventTALaunchRequest lrEvent21 = createLaunchRequestEvent(taID211, ta211, resource1, |
| host1, racks, priority1, v21LR); |
| |
| taskSchedulerManager.handleEvent(lrEvent21); |
| |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| |
| verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId())); |
| |
| taskScheduler.onContainersAllocated(Collections.singletonList(container2)); |
| TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2)); |
| eventHandler.reset(); |
| |
| taskScheduler.shutdown(); |
| taskSchedulerManager.close(); |
| } |
| |
| @Test(timeout = 10000l) |
| public void testAssignmentOnShutdown() |
| throws IOException, InterruptedException, ExecutionException { |
| LOG.info("Test testAssignmentOnShutdown"); |
| Configuration tezConf = new Configuration(new YarnConfiguration()); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false); |
| tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0); |
| tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0); |
| |
| CapturingEventHandler eventHandler = new CapturingEventHandler(); |
| TezDAGID dagID = TezDAGID.getInstance("0", 0, 0); |
| |
| AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest(); |
| TezAMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100)); |
| |
| AppContext appContext = mock(AppContext.class); |
| doReturn(new Configuration(false)).when(appContext).getAMConf(); |
| AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), |
| mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext); |
| AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext); |
| doReturn(amContainerMap).when(appContext).getAllContainers(); |
| doReturn(amNodeTracker).when(appContext).getNodeTracker(); |
| doReturn(DAGAppMasterState.SUCCEEDED).when(appContext).getAMState(); |
| doReturn(true).when(appContext).isAMInCompletionState(); |
| doReturn(dagID).when(appContext).getCurrentDAGID(); |
| doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo(); |
| |
| TaskSchedulerManager taskSchedulerManagerReal = |
| new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, |
| new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf)); |
| TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal); |
| taskSchedulerManager.init(tezConf); |
| taskSchedulerManager.start(); |
| |
| TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) |
| ((TaskSchedulerManagerForTest) taskSchedulerManager) |
| .getSpyTaskScheduler(); |
| TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback(); |
| AtomicBoolean drainNotifier = new AtomicBoolean(false); |
| taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier; |
| |
| Resource resource1 = Resource.newInstance(1024, 1); |
| String[] host1 = {"host1"}; |
| |
| String []racks = {"/default-rack"}; |
| Priority priority1 = Priority.newInstance(1); |
| |
| TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1); |
| |
| //Vertex 1, Task 1, Attempt 1, host1 |
| TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 1), 1); |
| TaskAttempt ta11 = mock(TaskAttempt.class); |
| AMSchedulerEventTALaunchRequest lrEvent1 = createLaunchRequestEvent(taID11, ta11, resource1, |
| host1, racks, priority1); |
| taskSchedulerManager.handleEvent(lrEvent1); |
| |
| Container container1 = createContainer(1, "host1", resource1, priority1); |
| |
| // One container allocated. |
| taskScheduler.onContainersAllocated(Collections.singletonList(container1)); |
| drainableAppCallback.drain(); |
| verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta11), |
| any(Object.class), eq(container1)); |
| taskScheduler.shutdown(); |
| taskSchedulerManager.close(); |
| } |
| |
| private Container createContainer(int id, String host, Resource resource, Priority priority) { |
| @SuppressWarnings("deprecation") |
| ContainerId containerID = ContainerId.newInstance( |
| ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1), |
| id); |
| NodeId nodeID = NodeId.newInstance(host, 0); |
| Container container = Container.newInstance(containerID, nodeID, host + ":0", |
| resource, priority, null); |
| return container; |
| } |
| |
| private AMSchedulerEventTALaunchRequest createLaunchRequestEvent( |
| TezTaskAttemptID taID, TaskAttempt ta, Resource capability, |
| String[] hosts, String[] racks, Priority priority, |
| ContainerContext containerContext) { |
| TaskLocationHint locationHint = null; |
| if (hosts != null || racks != null) { |
| Set<String> hostsSet = Sets.newHashSet(hosts); |
| Set<String> racksSet = Sets.newHashSet(racks); |
| locationHint = TaskLocationHint.createTaskLocationHint(hostsSet, racksSet); |
| } |
| AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest( |
| taID, capability, new TaskSpec(taID, "dagName", "vertexName", -1, |
| ProcessorDescriptor.create("processorClassName"), |
| Collections.singletonList(new InputSpec("vertexName", |
| InputDescriptor.create("inputClassName"), 1)), |
| Collections.singletonList(new OutputSpec("vertexName", |
| OutputDescriptor.create("outputClassName"), 1)), null, null), ta, locationHint, |
| priority.getPriority(), containerContext, 0, 0, 0); |
| return lr; |
| } |
| |
| private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(TezTaskAttemptID taID, |
| TaskAttempt ta, Resource capability, String[] hosts, String[] racks, Priority priority) { |
| return createLaunchRequestEvent(taID, ta, capability, hosts, racks, priority, |
| new HashMap<String, LocalResource>()); |
| } |
| |
| private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(TezTaskAttemptID taID, |
| TaskAttempt ta, Resource capability, String[] hosts, String[] racks, Priority priority, |
| Map<String, LocalResource> localResources) { |
| return createLaunchRequestEvent(taID, ta, capability, hosts, racks, priority, |
| localResources, ""); |
| } |
| |
| private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(TezTaskAttemptID taID, |
| TaskAttempt ta, Resource capability, String[] hosts, String[] racks, Priority priority, |
| Map<String, LocalResource> localResources, String jvmOpts) { |
| return createLaunchRequestEvent(taID, ta, capability, hosts, racks, priority, |
| new ContainerContext(localResources, new Credentials(), new HashMap<String, String>(), jvmOpts)); |
| } |
| |
| private static class ChangingDAGIDAnswer implements Answer<TezDAGID> { |
| |
| private TezDAGID dagID; |
| |
| public ChangingDAGIDAnswer(TezDAGID dagID) { |
| this.dagID = dagID; |
| } |
| |
| public void setDAGID(TezDAGID dagID) { |
| this.dagID = dagID; |
| } |
| |
| @Override |
| public TezDAGID answer(InvocationOnMock invocation) throws Throwable { |
| return this.dagID; |
| } |
| } |
| |
| private void verifyDeAllocateTask(TaskScheduler taskScheduler, Object ta, boolean taskSucceeded, |
| TaskAttemptEndReason endReason, String diagContains) { |
| ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class); |
| try { |
| verify(taskScheduler) |
| .deallocateTask(eq(ta), eq(taskSucceeded), eq(endReason), argumentCaptor.capture()); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| assertEquals(1, argumentCaptor.getAllValues().size()); |
| if (diagContains == null) { |
| assertNull(argumentCaptor.getValue()); |
| } else { |
| assertTrue(argumentCaptor.getValue().contains(diagContains)); |
| } |
| } |
| } |