blob: 474ae788235e5afc92cf638fa33c8ef878d65d19 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.CollectorInfo;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Before;
/**
* Test Base for ResourceManager's Protocol on HA.
*
* Limited scope:
* For all the test cases, we only test whether the method will be re-entered
* when failover happens. Does not cover the entire logic of test.
*
* Test strategy:
* Create a separate failover thread with a trigger flag,
* override all APIs that are added trigger flag.
* When the APIs are called, we will set trigger flag as true to kick off
* the failover. So We can make sure the failover happens during process
* of the method. If this API is marked as @Idempotent or @AtMostOnce,
* the test cases will pass; otherwise, they will throw the exception.
*
*/
public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
protected static final HAServiceProtocol.StateChangeRequestInfo req =
new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
protected static final String RM1_NODE_ID = "rm1";
protected static final int RM1_PORT_BASE = 10000;
protected static final String RM2_NODE_ID = "rm2";
protected static final int RM2_PORT_BASE = 20000;
protected Configuration conf;
protected MiniYARNClusterForHATesting cluster;
protected Thread failoverThread = null;
private volatile boolean keepRunning;
@Before
public void setup() throws IOException {
failoverThread = null;
keepRunning = true;
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.setInt(YarnConfiguration.CLIENT_FAILOVER_MAX_ATTEMPTS, 5);
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE, conf);
HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE, conf);
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
}
@After
public void teardown() throws Exception {
keepRunning = false;
if (failoverThread != null) {
failoverThread.interrupt();
try {
failoverThread.join();
} catch (InterruptedException ex) {
LOG.error("Error joining with failover thread", ex);
}
}
cluster.stop();
}
protected AdminService getAdminService(int index) {
return cluster.getResourceManager(index).getRMContext()
.getRMAdminService();
}
protected void explicitFailover() throws IOException {
int activeRMIndex = cluster.getActiveRMIndex();
int newActiveRMIndex = (activeRMIndex + 1) % 2;
getAdminService(activeRMIndex).transitionToStandby(req);
getAdminService(newActiveRMIndex).transitionToActive(req);
assertEquals("Failover failed", newActiveRMIndex,
cluster.getActiveRMIndex());
}
protected YarnClient createAndStartYarnClient(Configuration conf) {
Configuration configuration = new YarnConfiguration(conf);
YarnClient client = YarnClient.createYarnClient();
client.init(configuration);
client.start();
return client;
}
protected void verifyConnections() throws InterruptedException,
YarnException {
assertTrue("NMs failed to connect to the RM",
cluster.waitForNodeManagersToConnect(5000));
verifyClientConnection();
}
protected void verifyClientConnection() {
int numRetries = 3;
while(numRetries-- > 0) {
Configuration conf = new YarnConfiguration(this.conf);
YarnClient client = createAndStartYarnClient(conf);
try {
Thread.sleep(100);
client.getApplications();
return;
} catch (Exception e) {
LOG.error(e.getMessage());
} finally {
client.stop();
}
}
fail("Client couldn't connect to the Active RM");
}
protected Thread createAndStartFailoverThread() {
Thread failoverThread = new Thread() {
public void run() {
keepRunning = true;
while (keepRunning) {
if (cluster.getStartFailoverFlag()) {
try {
explicitFailover();
keepRunning = false;
cluster.resetFailoverTriggeredFlag(true);
} catch (Exception e) {
// Do Nothing
} finally {
keepRunning = false;
}
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// DO NOTHING
}
}
}
};
failoverThread.start();
return failoverThread;
}
protected void startHACluster(int numOfNMs, boolean overrideClientRMService,
boolean overrideRTS, boolean overrideApplicationMasterService)
throws Exception {
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
cluster =
new MiniYARNClusterForHATesting(TestRMFailover.class.getName(), 2,
numOfNMs, 1, 1, false, overrideClientRMService, overrideRTS,
overrideApplicationMasterService);
cluster.resetStartFailoverFlag(false);
cluster.init(conf);
cluster.start();
assertFalse("RM never turned active", -1 == cluster.getActiveRMIndex());
verifyConnections();
// Do the failover
explicitFailover();
verifyConnections();
failoverThread = createAndStartFailoverThread();
}
protected ResourceManager getActiveRM() {
return cluster.getResourceManager(cluster.getActiveRMIndex());
}
public class MiniYARNClusterForHATesting extends MiniYARNCluster {
private boolean overrideClientRMService;
private boolean overrideRTS;
private boolean overrideApplicationMasterService;
private final AtomicBoolean startFailover = new AtomicBoolean(false);
private final AtomicBoolean failoverTriggered = new AtomicBoolean(false);
public MiniYARNClusterForHATesting(String testName,
int numResourceManagers, int numNodeManagers, int numLocalDirs,
int numLogDirs, boolean enableAHS, boolean overrideClientRMService,
boolean overrideRTS, boolean overrideApplicationMasterService) {
super(testName, numResourceManagers, numNodeManagers, numLocalDirs,
numLogDirs, enableAHS);
this.overrideClientRMService = overrideClientRMService;
this.overrideRTS = overrideRTS;
this.overrideApplicationMasterService = overrideApplicationMasterService;
}
public boolean getStartFailoverFlag() {
return startFailover.get();
}
public void resetStartFailoverFlag(boolean flag) {
startFailover.set(flag);
}
public void resetFailoverTriggeredFlag(boolean flag) {
failoverTriggered.set(flag);
}
private boolean waittingForFailOver() {
int maximumWaittingTime = 50;
int count = 0;
while (!failoverTriggered.get() && count <= maximumWaittingTime) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// DO NOTHING
}
count++;
}
if (count >= maximumWaittingTime && failoverThread != null) {
return false;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// DO NOTHING
}
return true;
}
@Override
protected ResourceManager createResourceManager() {
return new ResourceManager() {
@Override
protected void doSecureLogin() throws IOException {
// Don't try to login using keytab in the testcases.
}
@Override
protected ClientRMService createClientRMService() {
if (overrideClientRMService) {
return new CustomedClientRMService(this.rmContext, this.scheduler,
this.rmAppManager, this.applicationACLsManager,
this.queueACLsManager,
this.rmContext.getRMDelegationTokenSecretManager());
}
return super.createClientRMService();
}
@Override
protected ResourceTrackerService createResourceTrackerService() {
if (overrideRTS) {
return new CustomedResourceTrackerService(this.rmContext,
this.nodesListManager, this.nmLivelinessMonitor,
this.rmContext.getContainerTokenSecretManager(),
this.rmContext.getNMTokenSecretManager());
}
return super.createResourceTrackerService();
}
@Override
protected ApplicationMasterService createApplicationMasterService() {
if (overrideApplicationMasterService) {
return new CustomedApplicationMasterService(this.rmContext,
this.scheduler);
}
return super.createApplicationMasterService();
}
};
}
private class CustomedClientRMService extends ClientRMService {
public CustomedClientRMService(RMContext rmContext,
YarnScheduler scheduler, RMAppManager rmAppManager,
ApplicationACLsManager applicationACLsManager,
QueueACLsManager queueACLsManager,
RMDelegationTokenSecretManager rmDTSecretManager) {
super(rmContext, scheduler, rmAppManager, applicationACLsManager,
queueACLsManager, rmDTSecretManager);
}
@Override
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// create the GetNewApplicationResponse with fake applicationId
GetNewApplicationResponse response =
GetNewApplicationResponse.newInstance(
createFakeAppId(), null, null);
return response;
}
@Override
public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// create a fake application report
ApplicationReport report = createFakeAppReport();
GetApplicationReportResponse response =
GetApplicationReportResponse.newInstance(report);
return response;
}
@Override
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// create GetClusterMetricsResponse with fake YarnClusterMetrics
GetClusterMetricsResponse response =
GetClusterMetricsResponse.newInstance(
createFakeYarnClusterMetrics());
return response;
}
@Override
public GetApplicationsResponse getApplications(
GetApplicationsRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// create GetApplicationsResponse with fake applicationList
GetApplicationsResponse response =
GetApplicationsResponse.newInstance(createFakeAppReports());
return response;
}
@Override
public GetClusterNodesResponse getClusterNodes(
GetClusterNodesRequest request)
throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// create GetClusterNodesResponse with fake ClusterNodeLists
GetClusterNodesResponse response =
GetClusterNodesResponse.newInstance(createFakeNodeReports());
return response;
}
@Override
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// return fake QueueInfo
return GetQueueInfoResponse.newInstance(createFakeQueueInfo());
}
@Override
public GetQueueUserAclsInfoResponse getQueueUserAcls(
GetQueueUserAclsInfoRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// return fake queueUserAcls
return GetQueueUserAclsInfoResponse
.newInstance(createFakeQueueUserACLInfoList());
}
@Override
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
GetApplicationAttemptReportRequest request) throws YarnException,
IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// return fake ApplicationAttemptReport
return GetApplicationAttemptReportResponse
.newInstance(createFakeApplicationAttemptReport());
}
@Override
public GetApplicationAttemptsResponse getApplicationAttempts(
GetApplicationAttemptsRequest request) throws YarnException,
IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// return fake ApplicationAttemptReports
return GetApplicationAttemptsResponse
.newInstance(createFakeApplicationAttemptReports());
}
@Override
public GetContainerReportResponse getContainerReport(
GetContainerReportRequest request) throws YarnException,
IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// return fake containerReport
return GetContainerReportResponse
.newInstance(createFakeContainerReport());
}
@Override
public GetContainersResponse getContainers(GetContainersRequest request)
throws YarnException, IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
// return fake ContainerReports
return GetContainersResponse.newInstance(createFakeContainerReports());
}
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException, IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return super.submitApplication(request);
}
@Override
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return KillApplicationResponse.newInstance(true);
}
@Override
public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
MoveApplicationAcrossQueuesRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return Records.newRecord(MoveApplicationAcrossQueuesResponse.class);
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return GetDelegationTokenResponse.newInstance(createFakeToken());
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return RenewDelegationTokenResponse
.newInstance(createNextExpirationTime());
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return CancelDelegationTokenResponse.newInstance();
}
}
public ApplicationReport createFakeAppReport() {
ApplicationId appId = ApplicationId.newInstance(1000L, 1);
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, 1);
// create a fake application report
ApplicationReport report =
ApplicationReport.newInstance(appId, attemptId, "fakeUser",
"fakeQueue", "fakeApplicationName", "localhost", 0, null,
YarnApplicationState.FINISHED, "fake an application report", "",
1000L, 1000L, 1200L, FinalApplicationStatus.FAILED, null, "", 50f,
"fakeApplicationType", null);
return report;
}
public List<ApplicationReport> createFakeAppReports() {
List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
reports.add(createFakeAppReport());
return reports;
}
public ApplicationId createFakeAppId() {
return ApplicationId.newInstance(1000L, 1);
}
public ApplicationAttemptId createFakeApplicationAttemptId() {
return ApplicationAttemptId.newInstance(createFakeAppId(), 0);
}
public ContainerId createFakeContainerId() {
return ContainerId.newContainerId(createFakeApplicationAttemptId(), 0);
}
public YarnClusterMetrics createFakeYarnClusterMetrics() {
return YarnClusterMetrics.newInstance(1);
}
public List<NodeReport> createFakeNodeReports() {
NodeId nodeId = NodeId.newInstance("localhost", 0);
NodeReport report =
NodeReport.newInstance(nodeId, NodeState.RUNNING, "localhost",
"rack1", null, null, 4, null, 1000L);
List<NodeReport> reports = new ArrayList<NodeReport>();
reports.add(report);
return reports;
}
public QueueInfo createFakeQueueInfo() {
return QueueInfo.newInstance("root", 100f, 100f, 50f, null,
createFakeAppReports(), QueueState.RUNNING, null, null, null, false,
null, false);
}
public List<QueueUserACLInfo> createFakeQueueUserACLInfoList() {
List<QueueACL> queueACL = new ArrayList<QueueACL>();
queueACL.add(QueueACL.SUBMIT_APPLICATIONS);
QueueUserACLInfo info = QueueUserACLInfo.newInstance("root", queueACL);
List<QueueUserACLInfo> infos = new ArrayList<QueueUserACLInfo>();
infos.add(info);
return infos;
}
public ApplicationAttemptReport createFakeApplicationAttemptReport() {
return ApplicationAttemptReport.newInstance(
createFakeApplicationAttemptId(), "localhost", 0, "", "", "",
YarnApplicationAttemptState.RUNNING, createFakeContainerId(), 1000L,
1200L);
}
public List<ApplicationAttemptReport>
createFakeApplicationAttemptReports() {
List<ApplicationAttemptReport> reports =
new ArrayList<ApplicationAttemptReport>();
reports.add(createFakeApplicationAttemptReport());
return reports;
}
public ContainerReport createFakeContainerReport() {
return ContainerReport.newInstance(createFakeContainerId(), null,
NodeId.newInstance("localhost", 0), null, 1000L, 1200L, "", "", 0,
ContainerState.COMPLETE,
"http://" + NodeId.newInstance("localhost", 0).toString());
}
public List<ContainerReport> createFakeContainerReports() {
List<ContainerReport> reports =
new ArrayList<ContainerReport>();
reports.add(createFakeContainerReport());
return reports;
}
public Token createFakeToken() {
String identifier = "fake Token";
String password = "fake token passwd";
Token token = Token.newInstance(
identifier.getBytes(), " ", password.getBytes(), " ");
return token;
}
public long createNextExpirationTime() {
return "fake Token".getBytes().length;
}
private class CustomedResourceTrackerService extends
ResourceTrackerService {
public CustomedResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager,
NMLivelinessMonitor nmLivelinessMonitor,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager) {
super(rmContext, nodesListManager, nmLivelinessMonitor,
containerTokenSecretManager, nmTokenSecretManager);
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return super.registerNodeManager(request);
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return super.nodeHeartbeat(request);
}
}
private class CustomedApplicationMasterService extends
ApplicationMasterService {
public CustomedApplicationMasterService(RMContext rmContext,
YarnScheduler scheduler) {
super(rmContext, scheduler);
}
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return createFakeAllocateResponse();
}
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return createFakeRegisterApplicationMasterResponse();
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnException,
IOException {
resetStartFailoverFlag(true);
// make sure failover has been triggered
Assert.assertTrue(waittingForFailOver());
return createFakeFinishApplicationMasterResponse();
}
}
public RegisterApplicationMasterResponse
createFakeRegisterApplicationMasterResponse() {
Resource minCapability = Resource.newInstance(2048, 2);
Resource maxCapability = Resource.newInstance(4096, 4);
Map<ApplicationAccessType, String> acls =
new HashMap<ApplicationAccessType, String>();
acls.put(ApplicationAccessType.MODIFY_APP, "*");
ByteBuffer key = ByteBuffer.wrap("fake_key".getBytes());
return RegisterApplicationMasterResponse.newInstance(minCapability,
maxCapability, acls, key, new ArrayList<Container>(), "root",
new ArrayList<NMToken>());
}
public FinishApplicationMasterResponse
createFakeFinishApplicationMasterResponse() {
return FinishApplicationMasterResponse.newInstance(true);
}
public AllocateResponse createFakeAllocateResponse() {
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
return AllocateResponse.newInstance(-1,
new ArrayList<ContainerStatus>(), new ArrayList<Container>(),
new ArrayList<NodeReport>(), Resource.newInstance(1024, 2), null, 1,
null, new ArrayList<NMToken>(), CollectorInfo.newInstance(
"host:port", Token.newInstance(new byte[] {0}, "TIMELINE",
new byte[] {0}, "rm")));
} else {
return AllocateResponse.newInstance(-1,
new ArrayList<ContainerStatus>(),
new ArrayList<Container>(), new ArrayList<NodeReport>(),
Resource.newInstance(1024, 2), null, 1,
null, new ArrayList<NMToken>());
}
}
}
}