blob: 7c57ab12920687fa6c5a46b5ff263eb66dfbfc9a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Extends the FederationInterceptor and overrides methods to provide a testable
* implementation of FederationInterceptor.
*/
public class TestableFederationInterceptor extends FederationInterceptor {
public static final Logger LOG =
LoggerFactory.getLogger(TestableFederationInterceptor.class);
private ConcurrentHashMap<String, MockResourceManagerFacade>
secondaryResourceManagers = new ConcurrentHashMap<>();
private AtomicInteger runningIndex = new AtomicInteger(0);
private MockResourceManagerFacade mockRm;
public TestableFederationInterceptor() {
}
public TestableFederationInterceptor(MockResourceManagerFacade homeRM,
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries) {
mockRm = homeRM;
secondaryResourceManagers = secondaries;
}
@Override
protected UnmanagedAMPoolManager createUnmanagedAMPoolManager(
ExecutorService threadPool) {
return new TestableUnmanagedAMPoolManager(threadPool);
}
@Override
protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
Configuration conf, ApplicationId appId,
AMRMClientRelayer rmProxyRelayer) {
return new TestableAMRequestHandlerThread(conf, appId, rmProxyRelayer);
}
@SuppressWarnings("unchecked")
@Override
protected <T> T createHomeRMProxy(AMRMProxyApplicationContext appContext,
Class<T> protocol, UserGroupInformation user) {
synchronized (this) {
if (mockRm == null) {
mockRm = new MockResourceManagerFacade(
new YarnConfiguration(super.getConf()), 0);
}
}
return (T) mockRm;
}
@SuppressWarnings("unchecked")
protected <T> T createSecondaryRMProxy(Class<T> proxyClass,
Configuration conf, String subClusterId) throws IOException {
// We create one instance of the mock resource manager per sub cluster. Keep
// track of the instances of the RMs in the map keyed by the sub cluster id
synchronized (this.secondaryResourceManagers) {
if (this.secondaryResourceManagers.containsKey(subClusterId)) {
return (T) this.secondaryResourceManagers.get(subClusterId);
} else {
// The running index here is used to simulate different RM_EPOCH to
// generate unique container identifiers in a federation environment
MockResourceManagerFacade rm = new MockResourceManagerFacade(
new Configuration(conf), runningIndex.addAndGet(10000));
this.secondaryResourceManagers.put(subClusterId, rm);
return (T) rm;
}
}
}
protected void setShouldReRegisterNext() {
if (mockRm != null) {
mockRm.setShouldReRegisterNext();
}
for (MockResourceManagerFacade subCluster : secondaryResourceManagers
.values()) {
subCluster.setShouldReRegisterNext();
}
}
protected MockResourceManagerFacade getHomeRM() {
return mockRm;
}
protected ConcurrentHashMap<String, MockResourceManagerFacade>
getSecondaryRMs() {
return secondaryResourceManagers;
}
protected MockResourceManagerFacade getSecondaryRM(String scId) {
return secondaryResourceManagers.get(scId);
}
/**
* Drain all aysnc heartbeat threads, comes in two favors:
*
* 1. waitForAsyncHBThreadFinish == false. Only wait for the async threads to
* pick up all pending heartbeat requests. Not necessarily wait for all
* threads to finish processing the last request. This is used to make sure
* all new UAM are launched by the async threads, but at the same time will
* finish draining while (slow) RM is still processing the last heartbeat
* request.
*
* 2. waitForAsyncHBThreadFinish == true. Wait for all async thread to finish
* processing all heartbeat requests.
*/
protected void drainAllAsyncQueue(boolean waitForAsyncHBThreadFinish)
throws YarnException {
LOG.info("waiting to drain home heartbeat handler");
if (waitForAsyncHBThreadFinish) {
getHomeHeartbeartHandler().drainHeartbeatThread();
} else {
while (getHomeHeartbeartHandler().getRequestQueueSize() > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}
LOG.info("waiting to drain UAM heartbeat handlers");
UnmanagedAMPoolManager uamPool = getUnmanagedAMPool();
if (waitForAsyncHBThreadFinish) {
getUnmanagedAMPool().drainUAMHeartbeats();
} else {
while (true) {
boolean done = true;
for (String scId : uamPool.getAllUAMIds()) {
if (uamPool.getRequestQueueSize(scId) > 0) {
done = false;
break;
}
}
if (done) {
break;
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}
}
protected UserGroupInformation getUGIWithToken(
ApplicationAttemptId appAttemptId) {
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(appAttemptId.toString());
AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, 1);
ugi.addTokenIdentifier(token);
return ugi;
}
/**
* Extends the UnmanagedAMPoolManager and overrides methods to provide a
* testable implementation of UnmanagedAMPoolManager.
*/
protected class TestableUnmanagedAMPoolManager
extends UnmanagedAMPoolManager {
public TestableUnmanagedAMPoolManager(ExecutorService threadpool) {
super(threadpool);
}
@Override
public UnmanagedApplicationManager createUAM(Configuration conf,
ApplicationId appId, String queueName, String submitter,
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
String rmId) {
return new TestableUnmanagedApplicationManager(conf, appId, queueName,
submitter, appNameSuffix, keepContainersAcrossApplicationAttempts,
rmId);
}
}
/**
* Extends the UnmanagedApplicationManager and overrides methods to provide a
* testable implementation.
*/
protected class TestableUnmanagedApplicationManager
extends UnmanagedApplicationManager {
public TestableUnmanagedApplicationManager(Configuration conf,
ApplicationId appId, String queueName, String submitter,
String appNameSuffix, boolean keepContainersAcrossApplicationAttempts,
String rmName) {
super(conf, appId, queueName, submitter, appNameSuffix,
keepContainersAcrossApplicationAttempts, rmName);
}
@Override
protected AMHeartbeatRequestHandler createAMHeartbeatRequestHandler(
Configuration conf, ApplicationId appId,
AMRMClientRelayer rmProxyRelayer) {
return new TestableAMRequestHandlerThread(conf, appId, rmProxyRelayer);
}
/**
* We override this method here to return a mock RM instances. The base
* class returns the proxy to the real RM which will not work in case of
* stand alone test cases.
*/
@Override
protected <T> T createRMProxy(Class<T> protocol, Configuration config,
UserGroupInformation user, Token<AMRMTokenIdentifier> token)
throws IOException {
return createSecondaryRMProxy(protocol, config,
YarnConfiguration.getClusterId(config));
}
}
/**
* Wrap the handler thread so it calls from the same user.
*/
protected class TestableAMRequestHandlerThread
extends AMHeartbeatRequestHandler {
public TestableAMRequestHandlerThread(Configuration conf,
ApplicationId applicationId, AMRMClientRelayer rmProxyRelayer) {
super(conf, applicationId, rmProxyRelayer);
}
@Override
public void run() {
try {
getUGIWithToken(getAttemptId())
.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() {
TestableAMRequestHandlerThread.super.run();
return null;
}
});
} catch (Exception e) {
}
}
}
}