blob: 3734e39d91d5fe2360e603d23c28445b906a8811 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationAttemptHistoryData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ApplicationHistoryData;
import org.apache.hadoop.yarn.server.applicationhistoryservice.records.ContainerHistoryData;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ApplicationHistoryManagerImpl extends AbstractService implements
ApplicationHistoryManager {
private static final Logger LOG =
LoggerFactory.getLogger(ApplicationHistoryManagerImpl.class);
private static final String UNAVAILABLE = "N/A";
private ApplicationHistoryStore historyStore;
private String serverHttpAddress;
public ApplicationHistoryManagerImpl() {
super(ApplicationHistoryManagerImpl.class.getName());
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
LOG.info("ApplicationHistory Init");
historyStore = createApplicationHistoryStore(conf);
historyStore.init(conf);
serverHttpAddress = WebAppUtils.getHttpSchemePrefix(conf) +
WebAppUtils.getAHSWebAppURLWithoutScheme(conf);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
LOG.info("Starting ApplicationHistory");
historyStore.start();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping ApplicationHistory");
historyStore.stop();
super.serviceStop();
}
protected ApplicationHistoryStore createApplicationHistoryStore(
Configuration conf) {
return ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.APPLICATION_HISTORY_STORE,
FileSystemApplicationHistoryStore.class,
ApplicationHistoryStore.class), conf);
}
@Override
public ContainerReport getAMContainer(ApplicationAttemptId appAttemptId)
throws IOException {
ApplicationReport app =
getApplication(appAttemptId.getApplicationId());
return convertToContainerReport(historyStore.getAMContainer(appAttemptId),
app == null ? null : app.getUser());
}
@Override
public Map<ApplicationId, ApplicationReport> getApplications(long appsNum,
long appStartedTimeBegin, long appStartedTimeEnd) throws IOException {
Map<ApplicationId, ApplicationHistoryData> histData =
historyStore.getAllApplications();
HashMap<ApplicationId, ApplicationReport> applicationsReport =
new HashMap<ApplicationId, ApplicationReport>();
for (Entry<ApplicationId, ApplicationHistoryData> entry : histData
.entrySet()) {
applicationsReport.put(entry.getKey(),
convertToApplicationReport(entry.getValue()));
}
return applicationsReport;
}
@Override
public ApplicationReport getApplication(ApplicationId appId)
throws IOException {
return convertToApplicationReport(historyStore.getApplication(appId));
}
private ApplicationReport convertToApplicationReport(
ApplicationHistoryData appHistory) throws IOException {
ApplicationAttemptId currentApplicationAttemptId = null;
String trackingUrl = UNAVAILABLE;
String host = UNAVAILABLE;
int rpcPort = -1;
ApplicationAttemptHistoryData lastAttempt =
getLastAttempt(appHistory.getApplicationId());
if (lastAttempt != null) {
currentApplicationAttemptId = lastAttempt.getApplicationAttemptId();
trackingUrl = lastAttempt.getTrackingURL();
host = lastAttempt.getHost();
rpcPort = lastAttempt.getRPCPort();
}
return ApplicationReport.newInstance(appHistory.getApplicationId(),
currentApplicationAttemptId, appHistory.getUser(), appHistory.getQueue(),
appHistory.getApplicationName(), host, rpcPort, null,
appHistory.getYarnApplicationState(), appHistory.getDiagnosticsInfo(),
trackingUrl, appHistory.getStartTime(), 0, appHistory.getFinishTime(),
appHistory.getFinalApplicationStatus(), null, "", 100,
appHistory.getApplicationType(), null);
}
private ApplicationAttemptHistoryData getLastAttempt(ApplicationId appId)
throws IOException {
Map<ApplicationAttemptId, ApplicationAttemptHistoryData> attempts =
historyStore.getApplicationAttempts(appId);
ApplicationAttemptId prevMaxAttemptId = null;
for (ApplicationAttemptId attemptId : attempts.keySet()) {
if (prevMaxAttemptId == null) {
prevMaxAttemptId = attemptId;
} else {
if (prevMaxAttemptId.getAttemptId() < attemptId.getAttemptId()) {
prevMaxAttemptId = attemptId;
}
}
}
return attempts.get(prevMaxAttemptId);
}
private ApplicationAttemptReport convertToApplicationAttemptReport(
ApplicationAttemptHistoryData appAttemptHistory) {
return ApplicationAttemptReport.newInstance(
appAttemptHistory.getApplicationAttemptId(), appAttemptHistory.getHost(),
appAttemptHistory.getRPCPort(), appAttemptHistory.getTrackingURL(), null,
appAttemptHistory.getDiagnosticsInfo(),
appAttemptHistory.getYarnApplicationAttemptState(),
appAttemptHistory.getMasterContainerId());
}
@Override
public ApplicationAttemptReport getApplicationAttempt(
ApplicationAttemptId appAttemptId) throws IOException {
return convertToApplicationAttemptReport(historyStore
.getApplicationAttempt(appAttemptId));
}
@Override
public Map<ApplicationAttemptId, ApplicationAttemptReport>
getApplicationAttempts(ApplicationId appId) throws IOException {
Map<ApplicationAttemptId, ApplicationAttemptHistoryData> histData =
historyStore.getApplicationAttempts(appId);
HashMap<ApplicationAttemptId, ApplicationAttemptReport> applicationAttemptsReport =
new HashMap<ApplicationAttemptId, ApplicationAttemptReport>();
for (Entry<ApplicationAttemptId, ApplicationAttemptHistoryData> entry : histData
.entrySet()) {
applicationAttemptsReport.put(entry.getKey(),
convertToApplicationAttemptReport(entry.getValue()));
}
return applicationAttemptsReport;
}
@Override
public ContainerReport getContainer(ContainerId containerId)
throws IOException {
ApplicationReport app =
getApplication(containerId.getApplicationAttemptId().getApplicationId());
return convertToContainerReport(historyStore.getContainer(containerId),
app == null ? null: app.getUser());
}
private ContainerReport convertToContainerReport(
ContainerHistoryData containerHistory, String user) {
// If the container has the aggregated log, add the server root url
String logUrl = WebAppUtils.getAggregatedLogURL(
serverHttpAddress,
containerHistory.getAssignedNode().toString(),
containerHistory.getContainerId().toString(),
containerHistory.getContainerId().toString(),
user);
return ContainerReport.newInstance(containerHistory.getContainerId(),
containerHistory.getAllocatedResource(),
containerHistory.getAssignedNode(), containerHistory.getPriority(),
containerHistory.getStartTime(), containerHistory.getFinishTime(),
containerHistory.getDiagnosticsInfo(), logUrl,
containerHistory.getContainerExitStatus(),
containerHistory.getContainerState(), null);
}
@Override
public Map<ContainerId, ContainerReport> getContainers(
ApplicationAttemptId appAttemptId) throws IOException {
ApplicationReport app =
getApplication(appAttemptId.getApplicationId());
Map<ContainerId, ContainerHistoryData> histData =
historyStore.getContainers(appAttemptId);
HashMap<ContainerId, ContainerReport> containersReport =
new HashMap<ContainerId, ContainerReport>();
for (Entry<ContainerId, ContainerHistoryData> entry : histData.entrySet()) {
containersReport.put(entry.getKey(),
convertToContainerReport(entry.getValue(),
app == null ? null : app.getUser()));
}
return containersReport;
}
@Private
@VisibleForTesting
public ApplicationHistoryStore getHistoryStore() {
return this.historyStore;
}
}