| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.library.vertexmanager; |
| |
| import static org.mockito.Mockito.*; |
| |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| |
| import org.apache.hadoop.yarn.api.records.Container; |
| import org.apache.hadoop.yarn.api.records.ContainerId; |
| import org.apache.tez.dag.api.EdgeProperty; |
| import org.apache.tez.dag.api.InputDescriptor; |
| import org.apache.tez.dag.api.OutputDescriptor; |
| import org.apache.tez.dag.api.TezUncheckedException; |
| import org.apache.tez.dag.api.VertexManagerPluginContext; |
| import org.apache.tez.dag.api.EdgeProperty.SchedulingType; |
| import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; |
| import org.apache.tez.dag.api.event.VertexState; |
| import org.apache.tez.dag.api.event.VertexStateUpdate; |
| import org.apache.tez.runtime.api.TaskAttemptIdentifier; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.Captor; |
| import org.mockito.MockitoAnnotations; |
| |
| import com.google.common.collect.Lists; |
| |
| @SuppressWarnings("unchecked") |
| public class TestInputReadyVertexManager { |
| |
| @Captor |
| ArgumentCaptor<List<ScheduleTaskRequest>> requestCaptor; |
| |
| @Before |
| public void init() { |
| MockitoAnnotations.initMocks(this); |
| } |
| |
| @Test (timeout=5000) |
| public void testBasicScatterGather() throws Exception { |
| HashMap<String, EdgeProperty> mockInputVertices = |
| new HashMap<String, EdgeProperty>(); |
| String mockSrcVertexId1 = "Vertex1"; |
| EdgeProperty eProp1 = EdgeProperty.create( |
| EdgeProperty.DataMovementType.SCATTER_GATHER, |
| EdgeProperty.DataSourceType.PERSISTED, |
| SchedulingType.SEQUENTIAL, |
| OutputDescriptor.create("out"), |
| InputDescriptor.create("in")); |
| |
| String mockManagedVertexId = "Vertex"; |
| |
| VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class); |
| when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); |
| when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); |
| when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2); |
| when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3); |
| mockInputVertices.put(mockSrcVertexId1, eProp1); |
| |
| InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); |
| manager.initialize(); |
| verify(mockContext, times(1)).vertexReconfigurationPlanned(); |
| // source vertex configured |
| manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); |
| verify(mockContext, times(1)).doneReconfiguringVertex(); |
| verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture()); |
| // then own vertex started |
| manager.onVertexStarted(Collections.singletonList( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0))); |
| manager.onSourceTaskCompleted( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); |
| verify(mockContext, times(0)).scheduleTasks(anyList()); |
| manager.onSourceTaskCompleted( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2)); |
| verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture()); |
| Assert.assertEquals(2, requestCaptor.getValue().size()); |
| } |
| |
| @Test (timeout=5000) |
| public void testBasicOneToOne() throws Exception { |
| HashMap<String, EdgeProperty> mockInputVertices = |
| new HashMap<String, EdgeProperty>(); |
| String mockSrcVertexId1 = "Vertex1"; |
| EdgeProperty eProp1 = EdgeProperty.create( |
| EdgeProperty.DataMovementType.ONE_TO_ONE, |
| EdgeProperty.DataSourceType.PERSISTED, |
| SchedulingType.SEQUENTIAL, |
| OutputDescriptor.create("out"), |
| InputDescriptor.create("in")); |
| |
| String mockManagedVertexId = "Vertex"; |
| |
| VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class); |
| when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); |
| when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); |
| when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); |
| when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3); |
| mockInputVertices.put(mockSrcVertexId1, eProp1); |
| |
| InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); |
| manager.initialize(); |
| verify(mockContext, times(1)).vertexReconfigurationPlanned(); |
| // source vertex configured |
| manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); |
| verify(mockContext, times(1)).doneReconfiguringVertex(); |
| verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture()); |
| manager.onVertexStarted(Collections.singletonList( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0))); |
| verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture()); |
| Assert.assertEquals(1, requestCaptor.getValue().size()); |
| Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex()); |
| Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0) |
| .getTaskLocationHint().getAffinitizedTask().getVertexName()); |
| Assert.assertEquals(0, requestCaptor.getValue().get(0) |
| .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); |
| manager.onSourceTaskCompleted( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); |
| verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture()); |
| Assert.assertEquals(1, requestCaptor.getValue().size()); |
| Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex()); |
| Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0) |
| .getTaskLocationHint().getAffinitizedTask().getVertexName()); |
| Assert.assertEquals(1, requestCaptor.getValue().get(0) |
| .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); |
| manager.onSourceTaskCompleted( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2)); |
| verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture()); |
| Assert.assertEquals(1, requestCaptor.getValue().size()); |
| Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex()); |
| Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0) |
| .getTaskLocationHint().getAffinitizedTask().getVertexName()); |
| Assert.assertEquals(2, requestCaptor.getValue().get(0) |
| .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); |
| } |
| |
| @Test (timeout=5000) |
| public void testDelayedConfigureOneToOne() throws Exception { |
| HashMap<String, EdgeProperty> mockInputVertices = |
| new HashMap<String, EdgeProperty>(); |
| String mockSrcVertexId1 = "Vertex1"; |
| EdgeProperty eProp1 = EdgeProperty.create( |
| EdgeProperty.DataMovementType.ONE_TO_ONE, |
| EdgeProperty.DataSourceType.PERSISTED, |
| SchedulingType.SEQUENTIAL, |
| OutputDescriptor.create("out"), |
| InputDescriptor.create("in")); |
| |
| String mockManagedVertexId = "Vertex"; |
| |
| VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class); |
| when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); |
| when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); |
| when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); |
| when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3); |
| mockInputVertices.put(mockSrcVertexId1, eProp1); |
| |
| InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); |
| manager.initialize(); |
| verify(mockContext, times(1)).vertexReconfigurationPlanned(); |
| verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture()); |
| // ok to have source task complete before anything else |
| manager.onSourceTaskCompleted( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); |
| // first own vertex started |
| manager.onVertexStarted(Collections.singletonList( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0))); |
| // no scheduling as we are not configured yet |
| verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture()); |
| // then source vertex configured. now we start |
| manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); |
| verify(mockContext, times(1)).doneReconfiguringVertex(); |
| |
| verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture()); |
| manager.onSourceTaskCompleted( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2)); |
| verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture()); |
| Assert.assertEquals(1, requestCaptor.getValue().size()); |
| Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex()); |
| Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0) |
| .getTaskLocationHint().getAffinitizedTask().getVertexName()); |
| Assert.assertEquals(2, requestCaptor.getValue().get(0) |
| .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); |
| } |
| |
| @Test (timeout=5000) |
| public void testComplex() throws Exception { |
| HashMap<String, EdgeProperty> mockInputVertices = |
| new HashMap<String, EdgeProperty>(); |
| String mockSrcVertexId1 = "Vertex1"; |
| EdgeProperty eProp1 = EdgeProperty.create( |
| EdgeProperty.DataMovementType.SCATTER_GATHER, |
| EdgeProperty.DataSourceType.PERSISTED, |
| SchedulingType.SEQUENTIAL, |
| OutputDescriptor.create("out"), |
| InputDescriptor.create("in")); |
| String mockSrcVertexId2 = "Vertex2"; |
| EdgeProperty eProp2 = EdgeProperty.create( |
| EdgeProperty.DataMovementType.ONE_TO_ONE, |
| EdgeProperty.DataSourceType.PERSISTED, |
| SchedulingType.SEQUENTIAL, |
| OutputDescriptor.create("out"), |
| InputDescriptor.create("in")); |
| String mockSrcVertexId3 = "Vertex3"; |
| EdgeProperty eProp3 = EdgeProperty.create( |
| EdgeProperty.DataMovementType.ONE_TO_ONE, |
| EdgeProperty.DataSourceType.PERSISTED, |
| SchedulingType.SEQUENTIAL, |
| OutputDescriptor.create("out"), |
| InputDescriptor.create("in")); |
| |
| String mockManagedVertexId = "Vertex"; |
| Container mockContainer2 = mock(Container.class); |
| ContainerId mockCId2 = mock(ContainerId.class); |
| when(mockContainer2.getId()).thenReturn(mockCId2); |
| Container mockContainer3 = mock(Container.class); |
| ContainerId mockCId3 = mock(ContainerId.class); |
| when(mockContainer3.getId()).thenReturn(mockCId3); |
| |
| VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class); |
| when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); |
| when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); |
| when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3); |
| when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(3); |
| when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3); |
| mockInputVertices.put(mockSrcVertexId1, eProp1); |
| mockInputVertices.put(mockSrcVertexId2, eProp2); |
| mockInputVertices.put(mockSrcVertexId3, eProp3); |
| |
| List<TaskAttemptIdentifier> initialCompletions = Lists.newArrayList(); |
| |
| // 1-1 sources do not match managed tasks. setParallelism called to make them match |
| when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); |
| InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); |
| manager.initialize(); |
| verify(mockContext, times(1)).vertexReconfigurationPlanned(); |
| manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); |
| manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); |
| manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); |
| verify(mockContext, times(1)).reconfigureVertex(3, null, null); |
| verify(mockContext, times(1)).doneReconfiguringVertex(); |
| manager.onVertexStarted(initialCompletions); |
| |
| // 1-1 sources do not match |
| when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); |
| when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(4); |
| manager = new InputReadyVertexManager(mockContext); |
| manager.initialize(); |
| verify(mockContext, times(2)).vertexReconfigurationPlanned(); |
| manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); |
| manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); |
| try { |
| manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); |
| Assert.assertTrue("Should have exception", false); |
| } catch (TezUncheckedException e) { |
| e.getMessage().contains("1-1 source vertices must have identical concurrency"); |
| } |
| verify(mockContext, times(1)).reconfigureVertex(anyInt(), any(), any()); // not invoked |
| |
| when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3); |
| |
| initialCompletions.add(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)); |
| initialCompletions.add(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0)); |
| manager = new InputReadyVertexManager(mockContext); |
| manager.initialize(); |
| verify(mockContext, times(3)).vertexReconfigurationPlanned(); |
| manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); |
| manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); |
| manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); |
| verify(mockContext, times(1)).reconfigureVertex(anyInt(), any(), any()); // not invoked |
| verify(mockContext, times(2)).doneReconfiguringVertex(); |
| manager.onVertexStarted(initialCompletions); |
| // all 1-1 0's done but not scheduled because v1 is not done |
| manager.onSourceTaskCompleted( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 0)); |
| manager.onSourceTaskCompleted( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); |
| manager.onSourceTaskCompleted( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // duplicate |
| manager.onSourceTaskCompleted( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1)); |
| verify(mockContext, times(0)).scheduleTasks(anyList()); |
| manager.onSourceTaskCompleted( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2)); // v1 done |
| verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture()); |
| Assert.assertEquals(1, requestCaptor.getValue().size()); |
| Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex()); |
| Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0) |
| .getTaskLocationHint().getAffinitizedTask().getVertexName()); |
| Assert.assertEquals(0, requestCaptor.getValue().get(0) |
| .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion |
| // 1-1 completion triggers since other 1-1 is done |
| manager.onSourceTaskCompleted( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 1)); |
| verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture()); |
| Assert.assertEquals(1, requestCaptor.getValue().size()); |
| Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex()); |
| Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0) |
| .getTaskLocationHint().getAffinitizedTask().getVertexName()); |
| Assert.assertEquals(1, requestCaptor.getValue().get(0) |
| .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion |
| // 1-1 completion does not trigger since other 1-1 is not done |
| manager.onSourceTaskCompleted( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 2)); |
| verify(mockContext, times(2)).scheduleTasks(anyList()); |
| // 1-1 completion trigger start |
| manager.onSourceTaskCompleted( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 2)); |
| verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture()); |
| Assert.assertEquals(1, requestCaptor.getValue().size()); |
| Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex()); |
| Assert.assertEquals(mockSrcVertexId2, requestCaptor.getValue().get(0) |
| .getTaskLocationHint().getAffinitizedTask().getVertexName()); |
| Assert.assertEquals(2, requestCaptor.getValue().get(0) |
| .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion |
| |
| // no more starts |
| manager.onSourceTaskCompleted( |
| TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 2)); |
| verify(mockContext, times(3)).scheduleTasks(anyList()); |
| |
| } |
| } |