blob: dabb8c32a56c1ce071eb1e4ae9819e617b352450 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.utils.FederationRegistryClient;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGContext;
import org.apache.hadoop.yarn.server.globalpolicygenerator.GPGUtils;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The ApplicationCleaner is a runnable that cleans up old applications from
* table applicationsHomeSubCluster in FederationStateStore.
*/
public abstract class ApplicationCleaner implements Runnable {
private static final Logger LOG =
LoggerFactory.getLogger(ApplicationCleaner.class);
private Configuration conf;
private GPGContext gpgContext;
private FederationRegistryClient registryClient;
private int minRouterSuccessCount;
private int maxRouterRetry;
private long routerQueryIntevalMillis;
public void init(Configuration config, GPGContext context)
throws YarnException {
this.gpgContext = context;
this.conf = config;
this.registryClient = context.getRegistryClient();
String routerSpecString =
this.conf.get(YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC,
YarnConfiguration.DEFAULT_GPG_APPCLEANER_CONTACT_ROUTER_SPEC);
String[] specs = routerSpecString.split(",");
if (specs.length != 3) {
throw new YarnException("Expect three comma separated values in "
+ YarnConfiguration.GPG_APPCLEANER_CONTACT_ROUTER_SPEC + " but get "
+ routerSpecString);
}
this.minRouterSuccessCount = Integer.parseInt(specs[0]);
this.maxRouterRetry = Integer.parseInt(specs[1]);
this.routerQueryIntevalMillis = Long.parseLong(specs[2]);
if (this.minRouterSuccessCount > this.maxRouterRetry) {
throw new YarnException("minRouterSuccessCount "
+ this.minRouterSuccessCount
+ " should not be larger than maxRouterRetry" + this.maxRouterRetry);
}
if (this.minRouterSuccessCount <= 0) {
throw new YarnException("minRouterSuccessCount "
+ this.minRouterSuccessCount + " should be positive");
}
LOG.info(
"Initialized AppCleaner with Router query with min success {}, "
+ "max retry {}, retry interval {}",
this.minRouterSuccessCount, this.maxRouterRetry,
DurationFormatUtils.formatDurationISO(this.routerQueryIntevalMillis));
}
public GPGContext getGPGContext() {
return this.gpgContext;
}
public FederationRegistryClient getRegistryClient() {
return this.registryClient;
}
/**
* Query router for applications.
*
* @return the set of applications
* @throws YarnRuntimeException when router call fails
*/
public Set<ApplicationId> getAppsFromRouter() throws YarnRuntimeException {
String webAppAddress = WebAppUtils.getRouterWebAppURLWithScheme(conf);
LOG.info(String.format("Contacting router at: %s", webAppAddress));
AppsInfo appsInfo = (AppsInfo) GPGUtils.invokeRMWebService(conf,
webAppAddress, RMWSConsts.APPS, AppsInfo.class,
DeSelectFields.DeSelectType.RESOURCE_REQUESTS.toString());
Set<ApplicationId> appSet = new HashSet<ApplicationId>();
for (AppInfo appInfo : appsInfo.getApps()) {
appSet.add(ApplicationId.fromString(appInfo.getAppId()));
}
return appSet;
}
/**
* Get the list of known applications in the cluster from Router.
*
* @return the list of known applications
* @throws YarnException if get app fails
*/
public Set<ApplicationId> getRouterKnownApplications() throws YarnException {
int successCount = 0, totalAttemptCount = 0;
Set<ApplicationId> resultSet = new HashSet<ApplicationId>();
while (totalAttemptCount < this.maxRouterRetry) {
try {
Set<ApplicationId> routerApps = getAppsFromRouter();
resultSet.addAll(routerApps);
LOG.info("Attempt {}: {} known apps from Router, {} in total",
totalAttemptCount, routerApps.size(), resultSet.size());
successCount++;
if (successCount >= this.minRouterSuccessCount) {
return resultSet;
}
// Wait for the next attempt
try {
Thread.sleep(this.routerQueryIntevalMillis);
} catch (InterruptedException e) {
LOG.warn("Sleep interrupted after attempt " + totalAttemptCount);
}
} catch (Exception e) {
LOG.warn("Router query attempt " + totalAttemptCount + " failed ", e);
} finally {
totalAttemptCount++;
}
}
throw new YarnException("Only " + successCount
+ " success Router queries after " + totalAttemptCount + " retries");
}
@Override
public abstract void run();
}