blob: 1b8cce3d6e55244ab546a1b9211204919553ba20 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.metrics;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import com.google.common.annotations.VisibleForTesting;
/**
* The class that helps RM publish metrics to the timeline server. RM will
* always invoke the methods of this class regardless the service is enabled or
* not. If it is disabled, publishing requests will be ignored silently.
*/
@Private
@Unstable
public class SystemMetricsPublisher extends CompositeService {
private static final Log LOG = LogFactory
.getLog(SystemMetricsPublisher.class);
private Dispatcher dispatcher;
private boolean publishSystemMetrics;
private boolean publishContainerMetrics;
protected RMContext rmContext;
public SystemMetricsPublisher(RMContext rmContext) {
super(SystemMetricsPublisher.class.getName());
this.rmContext = rmContext;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
publishSystemMetrics =
conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
if (publishSystemMetrics) {
TimelineServicePublisher timelineServicePublisher =
getTimelineServicePublisher(conf);
if (timelineServicePublisher != null) {
addService(timelineServicePublisher);
// init required to be called so that other methods of
// TimelineServicePublisher can be utilized
timelineServicePublisher.init(conf);
dispatcher = createDispatcher(timelineServicePublisher);
publishContainerMetrics =
timelineServicePublisher.publishRMContainerMetrics();
dispatcher.register(SystemMetricsEventType.class,
timelineServicePublisher.getEventHandler());
addIfService(dispatcher);
} else {
LOG.info("TimelineServicePublisher is not configured");
publishSystemMetrics = false;
}
LOG.info("YARN system metrics publishing service is enabled");
} else {
LOG.info("YARN system metrics publishing service is not enabled");
}
super.serviceInit(conf);
}
@VisibleForTesting
Dispatcher createDispatcher(TimelineServicePublisher timelineServicePublisher) {
return timelineServicePublisher.getDispatcher();
}
TimelineServicePublisher getTimelineServicePublisher(Configuration conf) {
if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
return new TimelineServiceV1Publisher();
} else if (conf.getBoolean(
YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
return new TimelineServiceV2Publisher(rmContext);
}
return null;
}
@SuppressWarnings("unchecked")
public void appCreated(RMApp app, long createdTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ApplicationCreatedEvent(
app.getApplicationId(),
app.getName(),
app.getApplicationType(),
app.getUser(),
app.getQueue(),
app.getSubmitTime(),
createdTime, app.getApplicationTags(),
app.getApplicationSubmissionContext().getUnmanagedAM(),
app.getApplicationSubmissionContext().getPriority()));
}
}
@SuppressWarnings("unchecked")
public void appFinished(RMApp app, RMAppState state, long finishedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ApplicationFinishedEvent(
app.getApplicationId(),
app.getDiagnostics().toString(),
app.getFinalApplicationStatus(),
RMServerUtils.createApplicationState(state),
app.getCurrentAppAttempt() == null ?
null : app.getCurrentAppAttempt().getAppAttemptId(),
finishedTime,
app.getRMAppMetrics(),
(RMAppImpl)app));
}
}
@SuppressWarnings("unchecked")
public void appACLsUpdated(RMApp app, String appViewACLs,
long updatedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new ApplicationACLsUpdatedEvent(
app.getApplicationId(),
appViewACLs == null ? "" : appViewACLs,
updatedTime));
}
}
@SuppressWarnings("unchecked")
public void appAttemptRegistered(RMAppAttempt appAttempt,
long registeredTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new AppAttemptRegisteredEvent(
appAttempt.getAppAttemptId(),
appAttempt.getHost(),
appAttempt.getRpcPort(),
appAttempt.getTrackingUrl(),
appAttempt.getOriginalTrackingUrl(),
appAttempt.getMasterContainer().getId(),
registeredTime));
}
}
@SuppressWarnings("unchecked")
public void appAttemptFinished(RMAppAttempt appAttempt,
RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
if (publishSystemMetrics) {
dispatcher.getEventHandler().handle(
new AppAttemptFinishedEvent(
appAttempt.getAppAttemptId(),
appAttempt.getTrackingUrl(),
appAttempt.getOriginalTrackingUrl(),
appAttempt.getDiagnostics(),
// app will get the final status from app attempt, or create one
// based on app state if it doesn't exist
app.getFinalApplicationStatus(),
RMServerUtils.createApplicationAttemptState(appAttemtpState),
finishedTime));
}
}
@SuppressWarnings("unchecked")
public void containerCreated(RMContainer container, long createdTime) {
if (publishContainerMetrics) {
dispatcher.getEventHandler().handle(
new ContainerCreatedEvent(
container.getContainerId(),
container.getAllocatedResource(),
container.getAllocatedNode(),
container.getAllocatedPriority(),
createdTime, container.getNodeHttpAddress()));
}
}
@SuppressWarnings("unchecked")
public void containerFinished(RMContainer container, long finishedTime) {
if (publishContainerMetrics) {
dispatcher.getEventHandler().handle(
new ContainerFinishedEvent(
container.getContainerId(),
container.getDiagnosticsInfo(),
container.getContainerExitStatus(),
container.getContainerState(),
finishedTime));
}
}
@VisibleForTesting
boolean isPublishContainerMetrics() {
return publishContainerMetrics;
}
@VisibleForTesting
Dispatcher getDispatcher() {
return dispatcher;
}
interface TimelineServicePublisher extends Service {
/**
* @return the Dispatcher which needs to be used to dispatch events
*/
Dispatcher getDispatcher();
/**
* @return true if RMContainerMetricsNeeds to be sent
*/
boolean publishRMContainerMetrics();
/**
* @return EventHandler which needs to be registered to the dispatcher to
* handle the SystemMetricsEvent
*/
EventHandler<SystemMetricsEvent> getEventHandler();
}
}