blob: a4b6dc5ae580e78c4b577ecd86afdbb1666c3161 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContextImpl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Unit test for DefaultApplicationCleaner in GPG.
*/
public class TestDefaultApplicationCleaner {
private Configuration conf;
private MemoryFederationStateStore stateStore;
private FederationStateStoreFacade facade;
private ApplicationCleaner appCleaner;
private GPGContext gpgContext;
private RegistryOperations registry;
private FederationRegistryClient registryClient;
private List<ApplicationId> appIds;
// The list of applications returned by mocked router
private Set<ApplicationId> routerAppIds;
private ApplicationId appIdToAddConcurrently;
@Before
public void setup() throws Exception {
conf = new YarnConfiguration();
// No Router query retry
conf.set(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC, "1,1,0");
stateStore = new MemoryFederationStateStore();
stateStore.init(conf);
facade = FederationStateStoreFacade.getInstance();
facade.reinitialize(stateStore, conf);
registry = new FSRegistryOperationsService();
registry.init(conf);
registry.start();
UserGroupInformation user = UserGroupInformation.getCurrentUser();
registryClient = new FederationRegistryClient(conf, registry, user);
registryClient.cleanAllApplications();
Assert.assertEquals(0, registryClient.getAllApplications().size());
gpgContext = new GPGContextImpl();
gpgContext.setStateStoreFacade(facade);
gpgContext.setRegistryClient(registryClient);
appCleaner = new TestableDefaultApplicationCleaner();
appCleaner.init(conf, gpgContext);
routerAppIds = new HashSet<ApplicationId>();
appIds = new ArrayList<ApplicationId>();
for (int i = 0; i < 3; i++) {
ApplicationId appId = ApplicationId.newInstance(0, i);
appIds.add(appId);
SubClusterId subClusterId =
SubClusterId.newInstance("SUBCLUSTER-" + Integer.toString(i));
stateStore.addApplicationHomeSubCluster(
AddApplicationHomeSubClusterRequest.newInstance(
ApplicationHomeSubCluster.newInstance(appId, subClusterId)));
// Write some registry entries for the app
registryClient.writeAMRMTokenForUAM(appId, subClusterId.toString(),
new Token<AMRMTokenIdentifier>());
}
Assert.assertEquals(3, registryClient.getAllApplications().size());
appIdToAddConcurrently = null;
}
@After
public void breakDown() throws Exception {
if (stateStore != null) {
stateStore.close();
}
if (registryClient != null) {
registryClient.cleanAllApplications();
}
if (registry != null) {
registry.stop();
}
}
@Test
public void testFederationStateStoreAppsCleanUp() throws YarnException {
// Set first app to be still known by Router
ApplicationId appId = appIds.get(0);
routerAppIds.add(appId);
// Another random app not in stateStore known by Router
appId = ApplicationId.newInstance(100, 200);
routerAppIds.add(appId);
appCleaner.run();
// Only one app should be left
Assert.assertEquals(1,
stateStore
.getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest.newInstance())
.getAppsHomeSubClusters().size());
// The known app should not be cleaned in registry
Assert.assertEquals(1, registryClient.getAllApplications().size());
}
@Test
public void testConcurrentNewApp() throws YarnException {
appIdToAddConcurrently = ApplicationId.newInstance(1, 1);
appCleaner.run();
// The concurrently added app should be still there
Assert.assertEquals(1,
stateStore
.getApplicationsHomeSubCluster(
GetApplicationsHomeSubClusterRequest.newInstance())
.getAppsHomeSubClusters().size());
// The concurrently added app should be still there
Assert.assertEquals(1, registryClient.getAllApplications().size());
}
/**
* Testable version of DefaultApplicationCleaner.
*/
public class TestableDefaultApplicationCleaner
extends DefaultApplicationCleaner {
@Override
public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
if (appIdToAddConcurrently != null) {
SubClusterId scId = SubClusterId.newInstance("MySubClusterId");
try {
stateStore
.addApplicationHomeSubCluster(AddApplicationHomeSubClusterRequest
.newInstance(ApplicationHomeSubCluster
.newInstance(appIdToAddConcurrently, scId)));
} catch (YarnException e) {
throw new YarnRuntimeException(e);
}
registryClient.writeAMRMTokenForUAM(appIdToAddConcurrently,
scId.toString(), new Token<AMRMTokenIdentifier>());
}
return routerAppIds;
}
}
}