blob: dd48241d72553640fcbd0af897cc80912930bd31 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.metrics;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.metrics.AMRMClientRelayerMetrics.RequestType;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Unit test for AMRMClientRelayer.
*/
public class TestAMRMClientRelayerMetrics {
/**
* Mock AMS for easier testing and mocking of request/responses.
*/
public static class MockApplicationMasterService
implements ApplicationMasterProtocol {
private boolean failover = false;
private boolean exception = false;
private List<ResourceRequest> lastAsk;
private List<ContainerId> lastRelease;
private List<UpdateContainerRequest> lastUpdates;
private List<String> lastBlacklistAdditions;
private List<String> lastBlacklistRemovals;
private AllocateResponse response = AllocateResponse
.newInstance(0, null, null, new ArrayList<NodeReport>(),
Resource.newInstance(0, 0), null, 0, null, null);
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
return null;
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request)
throws YarnException, IOException {
if (this.failover) {
this.failover = false;
throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
}
return null;
}
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
if (this.failover) {
this.failover = false;
throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
}
if(this.exception){
this.exception = false;
throw new YarnException("Mock RM encountered exception");
}
this.lastAsk = request.getAskList();
this.lastRelease = request.getReleaseList();
this.lastUpdates = request.getUpdateRequests();
this.lastBlacklistAdditions =
request.getResourceBlacklistRequest().getBlacklistAdditions();
this.lastBlacklistRemovals =
request.getResourceBlacklistRequest().getBlacklistRemovals();
return response;
}
public void setFailoverFlag() {
this.failover = true;
}
}
private Configuration conf;
private MockApplicationMasterService mockAMS;
private String homeID = "home";
private AMRMClientRelayer homeRelayer;
private String uamID = "uam";
private AMRMClientRelayer uamRelayer;
private List<ResourceRequest> asks = new ArrayList<>();
private List<ContainerId> releases = new ArrayList<>();
private List<UpdateContainerRequest> updates = new ArrayList<>();
private List<String> blacklistAdditions = new ArrayList<>();
private List<String> blacklistRemoval = new ArrayList<>();
@Before
public void setup() throws YarnException, IOException {
this.conf = new Configuration();
this.mockAMS = new MockApplicationMasterService();
this.homeRelayer = new AMRMClientRelayer(this.mockAMS,
ApplicationId.newInstance(0, 0), this.homeID);
this.homeRelayer.registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance("", 0, ""));
this.uamRelayer = new AMRMClientRelayer(this.mockAMS,
ApplicationId.newInstance(0, 0), this.uamID);
this.uamRelayer.registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance("", 0, ""));
clearAllocateRequestLists();
AMRMClientRelayerMetrics.getInstance()
.setClientPending(homeID, RequestType.Guaranteed, 0);
AMRMClientRelayerMetrics.getInstance()
.setClientPending(homeID, RequestType.Opportunistic, 0);
AMRMClientRelayerMetrics.getInstance()
.setClientPending(homeID, RequestType.Promote, 0);
AMRMClientRelayerMetrics.getInstance()
.setClientPending(homeID, RequestType.Demote, 0);
AMRMClientRelayerMetrics.getInstance()
.setClientPending(uamID, RequestType.Guaranteed, 0);
AMRMClientRelayerMetrics.getInstance()
.setClientPending(uamID, RequestType.Opportunistic, 0);
AMRMClientRelayerMetrics.getInstance()
.setClientPending(uamID, RequestType.Promote, 0);
AMRMClientRelayerMetrics.getInstance()
.setClientPending(uamID, RequestType.Demote, 0);
}
private AllocateRequest getAllocateRequest() {
// Need to create a new one every time because rather than directly
// referring the lists, the protobuf impl makes a copy of the lists
return AllocateRequest.newBuilder()
.responseId(0)
.progress(0).askList(asks)
.releaseList(new ArrayList<>(this.releases))
.resourceBlacklistRequest(ResourceBlacklistRequest.newInstance(
new ArrayList<>(this.blacklistAdditions),
new ArrayList<>(this.blacklistRemoval)))
.updateRequests(new ArrayList<>(this.updates))
.build();
}
private void clearAllocateRequestLists() {
this.asks.clear();
this.releases.clear();
this.updates.clear();
this.blacklistAdditions.clear();
this.blacklistRemoval.clear();
}
private static UpdateContainerRequest createPromote(int id){
return UpdateContainerRequest.newInstance(0, createContainerId(id),
ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Resource.newInstance(0, 0),
ExecutionType.GUARANTEED);
}
private static UpdateContainerRequest createDemote(int id){
return UpdateContainerRequest.newInstance(0, createContainerId(id),
ContainerUpdateType.DEMOTE_EXECUTION_TYPE, Resource.newInstance(0, 0),
ExecutionType.OPPORTUNISTIC);
}
private static ContainerId createContainerId(int id) {
return ContainerId.newContainerId(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(1, 1), 1),
id);
}
public ResourceRequest createResourceRequest(long id, String resource,
int memory, int vCores, int priority, ExecutionType execType,
int containers) {
ResourceRequest req = Records.newRecord(ResourceRequest.class);
req.setAllocationRequestId(id);
req.setResourceName(resource);
req.setCapability(Resource.newInstance(memory, vCores));
req.setPriority(Priority.newInstance(priority));
req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance(execType));
req.setNumContainers(containers);
return req;
}
@Test
public void testGPending() throws YarnException, IOException {
// Ask for two containers, one with location preference
this.asks.add(
createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED,
1));
this.asks.add(
createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED,
1));
this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
ExecutionType.GUARANTEED, 2));
this.homeRelayer.allocate(getAllocateRequest());
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(homeID, RequestType.Guaranteed).value());
Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(uamID, RequestType.Guaranteed).value());
// Ask from the uam
this.uamRelayer.allocate(getAllocateRequest());
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(homeID, RequestType.Guaranteed).value());
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(uamID, RequestType.Guaranteed).value());
// Update the any to ask for an extra container
this.asks.get(2).setNumContainers(3);
this.homeRelayer.allocate(getAllocateRequest());
Assert.assertEquals(3, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(homeID, RequestType.Guaranteed).value());
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(uamID, RequestType.Guaranteed).value());
// Update the any to ask to pretend a container was allocated
this.asks.get(2).setNumContainers(2);
this.homeRelayer.allocate(getAllocateRequest());
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(homeID, RequestType.Guaranteed).value());
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(uamID, RequestType.Guaranteed).value());
}
@Test
public void testPromotePending() throws YarnException, IOException {
// Ask to promote 3 containers
this.updates.add(createPromote(1));
this.updates.add(createPromote(2));
this.updates.add(createPromote(3));
this.homeRelayer.allocate(getAllocateRequest());
Assert.assertEquals(3, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(homeID, RequestType.Promote).value());
// Demote 2 containers, one of which is pending promote
this.updates.remove(createPromote(3));
this.updates.add(createDemote(3));
this.updates.add(createDemote(4));
this.homeRelayer.allocate(getAllocateRequest());
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(homeID, RequestType.Promote).value());
// Let the RM respond with two successful promotions, one of which
// was pending promote
List<UpdatedContainer> updated = new ArrayList<>();
updated.add(UpdatedContainer
.newInstance(ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Container
.newInstance(createContainerId(2), null, null, null, null, null)));
updated.add(UpdatedContainer
.newInstance(ContainerUpdateType.PROMOTE_EXECUTION_TYPE, Container
.newInstance(createContainerId(5), null, null, null, null, null)));
this.mockAMS.response.setUpdatedContainers(updated);
this.homeRelayer.allocate(getAllocateRequest());
Assert.assertEquals(1, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(homeID, RequestType.Promote).value());
// Remove the promoted container and clean up response
this.mockAMS.response.getUpdatedContainers().clear();
this.updates.remove(createPromote(2));
// Let the RM respond with two completed containers, one of which was
// pending promote
List<ContainerStatus> completed = new ArrayList<>();
completed
.add(ContainerStatus.newInstance(createContainerId(1), null, "", 0));
completed
.add(ContainerStatus.newInstance(createContainerId(6), null, "", 0));
this.mockAMS.response.setCompletedContainersStatuses(completed);
this.homeRelayer.allocate(getAllocateRequest());
Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(homeID, RequestType.Promote).value());
}
@Test
public void testCleanUpOnFinish() throws YarnException, IOException {
// Ask for two containers, one with location preference
this.asks.add(
createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED,
1));
this.asks.add(
createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED,
1));
this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
ExecutionType.GUARANTEED, 2));
// Ask to promote 3 containers
this.updates.add(createPromote(1));
this.updates.add(createPromote(2));
this.updates.add(createPromote(3));
// Run the allocate call to start tracking pending
this.homeRelayer.allocate(getAllocateRequest());
// After finish, the metrics should reset to zero
this.homeRelayer.shutdown();
Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(homeID, RequestType.Guaranteed).value());
Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(homeID, RequestType.Promote).value());
}
@Test
public void testFailover() throws YarnException, IOException {
// Ask for two containers, one with location preference
this.asks.add(
createResourceRequest(0, "node", 2048, 1, 1, ExecutionType.GUARANTEED,
1));
this.asks.add(
createResourceRequest(0, "rack", 2048, 1, 1, ExecutionType.GUARANTEED,
1));
this.asks.add(createResourceRequest(0, ResourceRequest.ANY, 2048, 1, 1,
ExecutionType.GUARANTEED, 2));
long previousSuccess = AMRMClientRelayerMetrics.getInstance()
.getHeartbeatSuccessMetric(homeID).value();
long previousFailover = AMRMClientRelayerMetrics.getInstance()
.getRMMasterSlaveSwitchMetric(homeID).value();
// Set failover to trigger
mockAMS.failover = true;
this.homeRelayer.allocate(getAllocateRequest());
// The failover metric should be incremented
Assert.assertEquals(++previousFailover,
AMRMClientRelayerMetrics.getInstance()
.getRMMasterSlaveSwitchMetric(homeID).value());
// The success metric should be incremented once
Assert.assertEquals(++previousSuccess,
AMRMClientRelayerMetrics.getInstance()
.getHeartbeatSuccessMetric(homeID).value());
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(homeID, RequestType.Guaranteed).value());
Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(uamID, RequestType.Guaranteed).value());
// Ask from the uam
this.uamRelayer.allocate(getAllocateRequest());
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(homeID, RequestType.Guaranteed).value());
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(uamID, RequestType.Guaranteed).value());
// Update the any to ask for an extra container
this.asks.get(2).setNumContainers(3);
mockAMS.failover = true;
this.homeRelayer.allocate(getAllocateRequest());
// The failover metric should be incremented
Assert.assertEquals(++previousFailover,
AMRMClientRelayerMetrics.getInstance()
.getRMMasterSlaveSwitchMetric(homeID).value());
// The success metric should be incremented once
Assert.assertEquals(++previousSuccess,
AMRMClientRelayerMetrics.getInstance()
.getHeartbeatSuccessMetric(homeID).value());
Assert.assertEquals(3, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(homeID, RequestType.Guaranteed).value());
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(uamID, RequestType.Guaranteed).value());
// Update the any to ask to pretend a container was allocated
this.asks.get(2).setNumContainers(2);
mockAMS.failover = true;
this.homeRelayer.allocate(getAllocateRequest());
// The failover metric should be incremented
Assert.assertEquals(++previousFailover,
AMRMClientRelayerMetrics.getInstance()
.getRMMasterSlaveSwitchMetric(homeID).value());
// The success metric should be incremented once
Assert.assertEquals(++previousSuccess,
AMRMClientRelayerMetrics.getInstance()
.getHeartbeatSuccessMetric(homeID).value());
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(homeID, RequestType.Guaranteed).value());
Assert.assertEquals(2, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(uamID, RequestType.Guaranteed).value());
long previousFailure = AMRMClientRelayerMetrics.getInstance()
.getHeartbeatFailureMetric(homeID).value();
mockAMS.exception = true;
try{
this.homeRelayer.allocate(getAllocateRequest());
Assert.fail();
} catch (YarnException e){
}
// The failover metric should not be incremented
Assert.assertEquals(previousFailover,
AMRMClientRelayerMetrics.getInstance()
.getRMMasterSlaveSwitchMetric(homeID).value());
// The success metric should not be incremented
Assert.assertEquals(previousSuccess,
AMRMClientRelayerMetrics.getInstance()
.getHeartbeatSuccessMetric(homeID).value());
// The failure metric should be incremented
Assert.assertEquals(++previousFailure,
AMRMClientRelayerMetrics.getInstance()
.getHeartbeatFailureMetric(homeID).value());
mockAMS.failover = true;
mockAMS.exception = true;
try{
this.homeRelayer.allocate(getAllocateRequest());
Assert.fail();
} catch (YarnException e){
}
// The failover metric should be incremented
Assert.assertEquals(++previousFailover,
AMRMClientRelayerMetrics.getInstance()
.getRMMasterSlaveSwitchMetric(homeID).value());
// The success metric should not be incremented
Assert.assertEquals(previousSuccess,
AMRMClientRelayerMetrics.getInstance()
.getHeartbeatSuccessMetric(homeID).value());
// The failure metric should be incremented
Assert.assertEquals(++previousFailure,
AMRMClientRelayerMetrics.getInstance()
.getHeartbeatFailureMetric(homeID).value());
}
@Test
public void testNewEmptyRequest()
throws YarnException, IOException {
// Ask for zero containers
this.asks.add(createResourceRequest(1, ResourceRequest.ANY, 2048, 1, 1,
ExecutionType.GUARANTEED, 0));
this.homeRelayer.allocate(getAllocateRequest());
Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(homeID, RequestType.Guaranteed).value());
Assert.assertEquals(0, AMRMClientRelayerMetrics.getInstance()
.getPendingMetric(uamID, RequestType.Guaranteed).value());
}
}