YARN-4129. Refactor the SystemMetricPublisher in RM to better support newer events (Naganarasimha G R via sjlee)
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 7959609..7c40f8c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -135,6 +135,9 @@
     YARN-3836. add equals and hashCode to TimelineEntity and other classes in
     the data model (Li Lu via sjlee)
 
+    YARN-4129. Refactor the SystemMetricPublisher in RM to better support
+    newer events (Naganarasimha G R via sjlee)
+
   OPTIMIZATIONS
 
   BUG FIXES
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index dae0353..85e27d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -117,12 +117,12 @@
 
   <!-- Object cast is based on the event type -->
   <Match>
-    <Class name="org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher" />
+    <Class name="org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher$ApplicationEventHandler" />
      <Bug pattern="BC_UNCONFIRMED_CAST" />
   </Match>
 
   <Match>
-    <Class name="org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher$ApplicationEventHandler" />
+    <Class name="org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher$TimelineV2EventHandler" />
      <Bug pattern="BC_UNCONFIRMED_CAST" />
   </Match>
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 6b58942..a5de053 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -64,7 +64,10 @@
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@@ -94,11 +97,11 @@
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
 import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
-import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
@@ -266,8 +269,9 @@
     addService(rmApplicationHistoryWriter);
     rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
 
-    SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher();
-    addService(systemMetricsPublisher);
+    SystemMetricsPublisher systemMetricsPublisher =
+        createSystemMetricsPublisher();
+    addIfService(systemMetricsPublisher);
     rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
 
     super.serviceInit(this.conf);
@@ -371,7 +375,24 @@
   }
 
   protected SystemMetricsPublisher createSystemMetricsPublisher() {
-    return new SystemMetricsPublisher(rmContext);
+    boolean timelineServiceEnabled =
+        conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
+    SystemMetricsPublisher publisher = null;
+    if (timelineServiceEnabled) {
+      if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
+          YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
+        LOG.info("TimelineService V1 is configured");
+        publisher = new TimelineServiceV1Publisher();
+      } else {
+        LOG.info("TimelineService V2 is configured");
+        publisher = new TimelineServiceV2Publisher(rmContext);
+      }
+    } else {
+      LOG.info("TimelineServicePublisher is not configured");
+      publisher = new NoOpSystemMetricPublisher();
+    }
+    return publisher;
   }
 
   // sanity check for configurations
@@ -494,10 +515,6 @@
       addService(rmApplicationHistoryWriter);
       rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
 
-      SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher();
-      addService(systemMetricsPublisher);
-      rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
-
       RMTimelineCollectorManager timelineCollectorManager =
           createRMTimelineCollectorManager();
       addService(timelineCollectorManager);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractSystemMetricsPublisher.java
new file mode 100644
index 0000000..a8c00a4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractSystemMetricsPublisher.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public abstract class AbstractSystemMetricsPublisher extends CompositeService
+    implements SystemMetricsPublisher {
+  private MultiThreadedDispatcher dispatcher;
+
+  protected Dispatcher getDispatcher() {
+    return dispatcher;
+  }
+
+  public AbstractSystemMetricsPublisher(String name) {
+    super(name);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    dispatcher =
+    new MultiThreadedDispatcher(getConfig().getInt(
+        YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
+        YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
+    dispatcher.setDrainEventsOnStop();
+    addIfService(dispatcher);
+    super.serviceInit(conf);
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static class MultiThreadedDispatcher extends CompositeService
+      implements Dispatcher {
+
+    private List<AsyncDispatcher> dispatchers =
+        new ArrayList<AsyncDispatcher>();
+
+    public MultiThreadedDispatcher(int num) {
+      super(MultiThreadedDispatcher.class.getName());
+      for (int i = 0; i < num; ++i) {
+        AsyncDispatcher dispatcher = createDispatcher();
+        dispatchers.add(dispatcher);
+        addIfService(dispatcher);
+      }
+    }
+
+    @Override
+    public EventHandler getEventHandler() {
+      return new CompositEventHandler();
+    }
+
+    @Override
+    public void register(Class<? extends Enum> eventType,
+        EventHandler handler) {
+      for (AsyncDispatcher dispatcher : dispatchers) {
+        dispatcher.register(eventType, handler);
+      }
+    }
+
+    public void setDrainEventsOnStop() {
+      for (AsyncDispatcher dispatcher : dispatchers) {
+        dispatcher.setDrainEventsOnStop();
+      }
+    }
+
+    private class CompositEventHandler implements EventHandler<Event> {
+
+      @Override
+      public void handle(Event event) {
+        // Use hashCode (of ApplicationId) to dispatch the event to the child
+        // dispatcher, such that all the writing events of one application will
+        // be handled by one thread, the scheduled order of the these events
+        // will be preserved
+        int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size();
+        dispatchers.get(index).getEventHandler().handle(event);
+      }
+    }
+
+    protected AsyncDispatcher createDispatcher() {
+      return new AsyncDispatcher();
+    }
+  }
+
+  /**
+   * EventType which is used while publishing the events
+   */
+  protected static enum SystemMetricsEventType {
+    PUBLISH_ENTITY, PUBLISH_APPLICATION_FINISHED_ENTITY
+  }
+
+  /**
+   * TimelinePublishEvent's hash code should be based on application's id this
+   * will ensure all the events related to a particular app goes to particular
+   * thread of MultiThreaded dispatcher.
+   */
+  protected static abstract class TimelinePublishEvent
+      extends AbstractEvent<SystemMetricsEventType> {
+
+    private ApplicationId appId;
+
+    public TimelinePublishEvent(SystemMetricsEventType type,
+        ApplicationId appId) {
+      super(type);
+      this.appId = appId;
+    }
+
+    public ApplicationId getApplicationId() {
+      return appId;
+    }
+
+    @Override
+    public int hashCode() {
+      return appId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (!(obj instanceof TimelinePublishEvent)) {
+        return false;
+      }
+      TimelinePublishEvent other = (TimelinePublishEvent) obj;
+      if (appId == null) {
+        if (other.appId != null) {
+          return false;
+        }
+      } else if (getType() == null) {
+        if (other.getType() != null) {
+          return false;
+        }
+      } else
+        if (!appId.equals(other.appId) || !getType().equals(other.getType())) {
+        return false;
+      }
+      return true;
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java
deleted file mode 100644
index 12145267..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AbstractTimelineServicePublisher.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher.TimelineServicePublisher;
-
-public abstract class AbstractTimelineServicePublisher extends CompositeService
-    implements TimelineServicePublisher, EventHandler<SystemMetricsEvent> {
-
-  private static final Log LOG = LogFactory
-      .getLog(TimelineServiceV2Publisher.class);
-
-  private Configuration conf;
-
-  public AbstractTimelineServicePublisher(String name) {
-    super(name);
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    this.conf = conf;
-    super.serviceInit(conf);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    super.serviceStop();
-  }
-
-  @Override
-  public void handle(SystemMetricsEvent event) {
-    switch (event.getType()) {
-    case APP_CREATED:
-      publishApplicationCreatedEvent((ApplicationCreatedEvent) event);
-      break;
-    case APP_FINISHED:
-      publishApplicationFinishedEvent((ApplicationFinishedEvent) event);
-      break;
-    case APP_UPDATED:
-      publishApplicationUpdatedEvent((ApplicationUpdatedEvent) event);
-      break;
-    case APP_ACLS_UPDATED:
-      publishApplicationACLsUpdatedEvent((ApplicationACLsUpdatedEvent) event);
-      break;
-    case APP_ATTEMPT_REGISTERED:
-      publishAppAttemptRegisteredEvent((AppAttemptRegisteredEvent) event);
-      break;
-    case APP_ATTEMPT_FINISHED:
-      publishAppAttemptFinishedEvent((AppAttemptFinishedEvent) event);
-      break;
-    case CONTAINER_CREATED:
-      publishContainerCreatedEvent((ContainerCreatedEvent) event);
-      break;
-    case CONTAINER_FINISHED:
-      publishContainerFinishedEvent((ContainerFinishedEvent) event);
-      break;
-    default:
-      LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
-    }
-  }
-
-  abstract void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event);
-
-  abstract void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event);
-
-  abstract void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event);
-
-  abstract void publishApplicationACLsUpdatedEvent(
-      ApplicationACLsUpdatedEvent event);
-
-  abstract void publishApplicationFinishedEvent(ApplicationFinishedEvent event);
-
-  abstract void publishApplicationCreatedEvent(ApplicationCreatedEvent event);
-
-  abstract void publishContainerCreatedEvent(ContainerCreatedEvent event);
-
-  abstract void publishContainerFinishedEvent(ContainerFinishedEvent event);
-
-  @Override
-  public Dispatcher getDispatcher() {
-    MultiThreadedDispatcher dispatcher =
-        new MultiThreadedDispatcher(
-            conf.getInt(
-                YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
-                YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE));
-    dispatcher.setDrainEventsOnStop();
-    return dispatcher;
-  }
-
-  @Override
-  public boolean publishRMContainerMetrics() {
-    return true;
-  }
-
-  @Override
-  public EventHandler<SystemMetricsEvent> getEventHandler() {
-    return this;
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  public static class MultiThreadedDispatcher extends CompositeService
-      implements Dispatcher {
-
-    private List<AsyncDispatcher> dispatchers =
-        new ArrayList<AsyncDispatcher>();
-
-    public MultiThreadedDispatcher(int num) {
-      super(MultiThreadedDispatcher.class.getName());
-      for (int i = 0; i < num; ++i) {
-        AsyncDispatcher dispatcher = createDispatcher();
-        dispatchers.add(dispatcher);
-        addIfService(dispatcher);
-      }
-    }
-
-    @Override
-    public EventHandler getEventHandler() {
-      return new CompositEventHandler();
-    }
-
-    @Override
-    public void register(Class<? extends Enum> eventType, EventHandler handler) {
-      for (AsyncDispatcher dispatcher : dispatchers) {
-        dispatcher.register(eventType, handler);
-      }
-    }
-
-    public void setDrainEventsOnStop() {
-      for (AsyncDispatcher dispatcher : dispatchers) {
-        dispatcher.setDrainEventsOnStop();
-      }
-    }
-
-    private class CompositEventHandler implements EventHandler<Event> {
-
-      @Override
-      public void handle(Event event) {
-        // Use hashCode (of ApplicationId) to dispatch the event to the child
-        // dispatcher, such that all the writing events of one application will
-        // be handled by one thread, the scheduled order of the these events
-        // will be preserved
-        int index = (event.hashCode() & Integer.MAX_VALUE) % dispatchers.size();
-        dispatchers.get(index).getEventHandler().handle(event);
-      }
-    }
-
-    protected AsyncDispatcher createDispatcher() {
-      return new AsyncDispatcher();
-    }
-  }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptFinishedEvent.java
deleted file mode 100644
index 71d9363..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptFinishedEvent.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
-
-public class AppAttemptFinishedEvent extends
-    SystemMetricsEvent {
-
-  private ApplicationAttemptId appAttemptId;
-  private String trackingUrl;
-  private String originalTrackingUrl;
-  private String diagnosticsInfo;
-  private FinalApplicationStatus appStatus;
-  private YarnApplicationAttemptState state;
-
-  public AppAttemptFinishedEvent(
-      ApplicationAttemptId appAttemptId,
-      String trackingUrl,
-      String originalTrackingUrl,
-      String diagnosticsInfo,
-      FinalApplicationStatus appStatus,
-      YarnApplicationAttemptState state,
-      long finishedTime) {
-    super(SystemMetricsEventType.APP_ATTEMPT_FINISHED, finishedTime);
-    this.appAttemptId = appAttemptId;
-    // This is the tracking URL after the application attempt is finished
-    this.trackingUrl = trackingUrl;
-    this.originalTrackingUrl = originalTrackingUrl;
-    this.diagnosticsInfo = diagnosticsInfo;
-    this.appStatus = appStatus;
-    this.state = state;
-  }
-
-  @Override
-  public int hashCode() {
-    return appAttemptId.getApplicationId().hashCode();
-  }
-
-  public ApplicationAttemptId getApplicationAttemptId() {
-    return appAttemptId;
-  }
-
-  public String getTrackingUrl() {
-    return trackingUrl;
-  }
-
-  public String getOriginalTrackingURL() {
-    return originalTrackingUrl;
-  }
-
-  public String getDiagnosticsInfo() {
-    return diagnosticsInfo;
-  }
-
-  public FinalApplicationStatus getFinalApplicationStatus() {
-    return appStatus;
-  }
-
-  public YarnApplicationAttemptState getYarnApplicationAttemptState() {
-    return state;
-  }
-
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptRegisteredEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptRegisteredEvent.java
deleted file mode 100644
index 1d0f16d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/AppAttemptRegisteredEvent.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-
-public class AppAttemptRegisteredEvent extends
-    SystemMetricsEvent {
-
-  private ApplicationAttemptId appAttemptId;
-  private String host;
-  private int rpcPort;
-  private String trackingUrl;
-  private String originalTrackingUrl;
-  private ContainerId masterContainerId;
-
-  public AppAttemptRegisteredEvent(
-      ApplicationAttemptId appAttemptId,
-      String host,
-      int rpcPort,
-      String trackingUrl,
-      String originalTrackingUrl,
-      ContainerId masterContainerId,
-      long registeredTime) {
-    super(SystemMetricsEventType.APP_ATTEMPT_REGISTERED, registeredTime);
-    this.appAttemptId = appAttemptId;
-    this.host = host;
-    this.rpcPort = rpcPort;
-    // This is the tracking URL after the application attempt is registered
-    this.trackingUrl = trackingUrl;
-    this.originalTrackingUrl = originalTrackingUrl;
-    this.masterContainerId = masterContainerId;
-  }
-
-  @Override
-  public int hashCode() {
-    return appAttemptId.getApplicationId().hashCode();
-  }
-
-  public ApplicationAttemptId getApplicationAttemptId() {
-    return appAttemptId;
-  }
-
-  public String getHost() {
-    return host;
-  }
-
-  public int getRpcPort() {
-    return rpcPort;
-  }
-
-  public String getTrackingUrl() {
-    return trackingUrl;
-  }
-
-  public String getOriginalTrackingURL() {
-    return originalTrackingUrl;
-  }
-
-  public ContainerId getMasterContainerId() {
-    return masterContainerId;
-  }
-
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationACLsUpdatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationACLsUpdatedEvent.java
deleted file mode 100644
index c8b314c..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationACLsUpdatedEvent.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-
-
-public class ApplicationACLsUpdatedEvent extends SystemMetricsEvent {
-
-  private ApplicationId appId;
-  private String viewAppACLs;
-
-  public ApplicationACLsUpdatedEvent(ApplicationId appId,
-      String viewAppACLs,
-      long updatedTime) {
-    super(SystemMetricsEventType.APP_ACLS_UPDATED, updatedTime);
-    this.appId = appId;
-    this.viewAppACLs = viewAppACLs;
-  }
-
-  public ApplicationId getApplicationId() {
-    return appId;
-  }
-
-  public String getViewAppACLs() {
-    return viewAppACLs;
-  }
-
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java
deleted file mode 100644
index a684dfc..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationCreatedEvent.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import java.util.Set;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Priority;
-
-public class ApplicationCreatedEvent extends
-    SystemMetricsEvent {
-
-  private ApplicationId appId;
-  private String name;
-  private String type;
-  private String user;
-  private String queue;
-  private long submittedTime;
-  private Set<String> appTags;
-  private boolean unmanagedApplication;
-  private Priority applicationPriority;
-  private String appNodeLabelsExpression;
-  private String amNodeLabelsExpression;
-
-  public ApplicationCreatedEvent(ApplicationId appId,
-      String name,
-      String type,
-      String user,
-      String queue,
-      long submittedTime,
-      long createdTime,
-      Set<String> appTags,
-      boolean unmanagedApplication,
-      Priority applicationPriority,
-      String appNodeLabelsExpression,
-      String amNodeLabelsExpression) {
-    super(SystemMetricsEventType.APP_CREATED, createdTime);
-    this.appId = appId;
-    this.name = name;
-    this.type = type;
-    this.user = user;
-    this.queue = queue;
-    this.submittedTime = submittedTime;
-    this.appTags = appTags;
-    this.unmanagedApplication = unmanagedApplication;
-    this.applicationPriority = applicationPriority;
-    this.appNodeLabelsExpression = appNodeLabelsExpression;
-    this.amNodeLabelsExpression = amNodeLabelsExpression;
-  }
-
-  @Override
-  public int hashCode() {
-    return appId.hashCode();
-  }
-
-  public ApplicationId getApplicationId() {
-    return appId;
-  }
-
-  public String getApplicationName() {
-    return name;
-  }
-
-  public String getApplicationType() {
-    return type;
-  }
-
-  public String getUser() {
-    return user;
-  }
-
-  public String getQueue() {
-    return queue;
-  }
-
-  public long getSubmittedTime() {
-    return submittedTime;
-  }
-
-  public Set<String> getAppTags() {
-    return appTags;
-  }
-
-  public boolean isUnmanagedApp() {
-    return unmanagedApplication;
-  }
-
-  public Priority getApplicationPriority() {
-    return applicationPriority;
-  }
-
-  public String getAppNodeLabelsExpression() {
-    return appNodeLabelsExpression;
-  }
-
-  public String getAmNodeLabelsExpression() {
-    return amNodeLabelsExpression;
-  }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
deleted file mode 100644
index d9241b2..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationFinishedEvent.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
-
-public class ApplicationFinishedEvent extends
-    SystemMetricsEvent {
-
-  private ApplicationId appId;;
-  private String diagnosticsInfo;
-  private FinalApplicationStatus appStatus;
-  private YarnApplicationState state;
-  private ApplicationAttemptId latestAppAttemptId;
-  private RMAppMetrics appMetrics;
-  private RMAppImpl app;
-
-  public ApplicationFinishedEvent(
-      ApplicationId appId,
-      String diagnosticsInfo,
-      FinalApplicationStatus appStatus,
-      YarnApplicationState state,
-      ApplicationAttemptId latestAppAttemptId,
-      long finishedTime,
-      RMAppMetrics appMetrics,
-      RMAppImpl app) {
-    super(SystemMetricsEventType.APP_FINISHED, finishedTime);
-    this.appId = appId;
-    this.diagnosticsInfo = diagnosticsInfo;
-    this.appStatus = appStatus;
-    this.latestAppAttemptId = latestAppAttemptId;
-    this.state = state;
-    this.appMetrics = appMetrics;
-    this.app = app;
-  }
-
-  @Override
-  public int hashCode() {
-    return appId.hashCode();
-  }
-
-  public RMAppImpl getApp() {
-    return app;
-  }
-
-  public ApplicationId getApplicationId() {
-    return appId;
-  }
-
-  public String getDiagnosticsInfo() {
-    return diagnosticsInfo;
-  }
-
-  public FinalApplicationStatus getFinalApplicationStatus() {
-    return appStatus;
-  }
-
-  public YarnApplicationState getYarnApplicationState() {
-    return state;
-  }
-
-  public ApplicationAttemptId getLatestApplicationAttemptId() {
-    return latestAppAttemptId;
-  }
-
-  public RMAppMetrics getAppMetrics() {
-    return appMetrics;
-  }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationUpdatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationUpdatedEvent.java
deleted file mode 100644
index 9e5e1fd..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ApplicationUpdatedEvent.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Priority;
-
-public class ApplicationUpdatedEvent extends SystemMetricsEvent {
-
-  private ApplicationId appId;
-  private String queue;
-  private Priority applicationPriority;
-
-  public ApplicationUpdatedEvent(ApplicationId appId, String queue,
-      long updatedTime, Priority applicationPriority) {
-    super(SystemMetricsEventType.APP_UPDATED, updatedTime);
-    this.appId = appId;
-    this.queue = queue;
-    this.applicationPriority = applicationPriority;
-  }
-
-  @Override
-  public int hashCode() {
-    return appId.hashCode();
-  }
-
-  public ApplicationId getApplicationId() {
-    return appId;
-  }
-
-  public String getQueue() {
-    return queue;
-  }
-
-  public Priority getApplicationPriority() {
-    return applicationPriority;
-  }
-}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerCreatedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerCreatedEvent.java
deleted file mode 100644
index 05b6781..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerCreatedEvent.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-
-public class ContainerCreatedEvent extends SystemMetricsEvent {
-
-  private ContainerId containerId;
-  private Resource allocatedResource;
-  private NodeId allocatedNode;
-  private Priority allocatedPriority;
-  private String nodeHttpAddress;
-
-  public ContainerCreatedEvent(
-      ContainerId containerId,
-      Resource allocatedResource,
-      NodeId allocatedNode,
-      Priority allocatedPriority,
-      long createdTime,
-      String nodeHttpAddress) {
-    super(SystemMetricsEventType.CONTAINER_CREATED, createdTime);
-    this.containerId = containerId;
-    this.allocatedResource = allocatedResource;
-    this.allocatedNode = allocatedNode;
-    this.allocatedPriority = allocatedPriority;
-    this.nodeHttpAddress = nodeHttpAddress;
-  }
-
-  @Override
-  public int hashCode() {
-    return containerId.getApplicationAttemptId().getApplicationId().hashCode();
-  }
-
-  public ContainerId getContainerId() {
-    return containerId;
-  }
-
-  public Resource getAllocatedResource() {
-    return allocatedResource;
-  }
-
-  public NodeId getAllocatedNode() {
-    return allocatedNode;
-  }
-
-  public Priority getAllocatedPriority() {
-    return allocatedPriority;
-  }
-
-  public String getNodeHttpAddress() {
-    return nodeHttpAddress;
-  }
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerFinishedEvent.java
deleted file mode 100644
index aafd760..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/ContainerFinishedEvent.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-
-public class ContainerFinishedEvent extends SystemMetricsEvent {
-
-  private ContainerId containerId;
-  private String diagnosticsInfo;
-  private int containerExitStatus;
-  private ContainerState state;
-
-  public ContainerFinishedEvent(
-      ContainerId containerId,
-      String diagnosticsInfo,
-      int containerExitStatus,
-      ContainerState state,
-      long finishedTime) {
-    super(SystemMetricsEventType.CONTAINER_FINISHED, finishedTime);
-    this.containerId = containerId;
-    this.diagnosticsInfo = diagnosticsInfo;
-    this.containerExitStatus = containerExitStatus;
-    this.state = state;
-  }
-
-  @Override
-  public int hashCode() {
-    return containerId.getApplicationAttemptId().getApplicationId().hashCode();
-  }
-
-  public ContainerId getContainerId() {
-    return containerId;
-  }
-
-  public String getDiagnosticsInfo() {
-    return diagnosticsInfo;
-  }
-
-  public int getContainerExitStatus() {
-    return containerExitStatus;
-  }
-
-  public ContainerState getContainerState() {
-    return state;
-  }
-
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/NoOpSystemMetricPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/NoOpSystemMetricPublisher.java
new file mode 100644
index 0000000..1810df1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/NoOpSystemMetricPublisher.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.metrics;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+/**
+ * This class does nothing when any of the methods are invoked on
+ * SystemMetricsPublisher
+ */
+public class NoOpSystemMetricPublisher implements SystemMetricsPublisher{
+
+  @Override
+  public void appCreated(RMApp app, long createdTime) {
+  }
+
+  @Override
+  public void appFinished(RMApp app, RMAppState state, long finishedTime) {
+  }
+
+  @Override
+  public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
+  }
+
+  @Override
+  public void appAttemptRegistered(RMAppAttempt appAttempt,
+      long registeredTime) {
+  }
+
+  @Override
+  public void appAttemptFinished(RMAppAttempt appAttempt,
+      RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
+  }
+
+  @Override
+  public void containerCreated(RMContainer container, long createdTime) {
+  }
+
+  @Override
+  public void containerFinished(RMContainer container, long finishedTime) {
+  }
+
+  @Override
+  public void appUpdated(RMApp app, long currentTimeMillis) {
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEvent.java
deleted file mode 100644
index 1847396..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEvent.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-
-public class SystemMetricsEvent extends AbstractEvent<SystemMetricsEventType> {
-
-  public SystemMetricsEvent(SystemMetricsEventType type) {
-    super(type);
-  }
-
-  public SystemMetricsEvent(SystemMetricsEventType type, long timestamp) {
-    super(type, timestamp);
-  }
-
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java
deleted file mode 100644
index c11034e..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsEventType.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.metrics;
-
-
-public enum SystemMetricsEventType {
-  // app events
-  APP_CREATED,
-  APP_FINISHED,
-  APP_ACLS_UPDATED,
-  APP_UPDATED,
-
-  // app attempt events
-  APP_ATTEMPT_REGISTERED,
-  APP_ATTEMPT_FINISHED,
-
-  // container events
-  CONTAINER_CREATED,
-  CONTAINER_FINISHED
-}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
index 6ab7838..f895bba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
@@ -18,244 +18,28 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.metrics;
 
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 
-import com.google.common.annotations.VisibleForTesting;
+public interface SystemMetricsPublisher {
 
-/**
- * The class that helps RM publish metrics to the timeline server. RM will
- * always invoke the methods of this class regardless the service is enabled or
- * not. If it is disabled, publishing requests will be ignored silently.
- */
-@Private
-@Unstable
-public class SystemMetricsPublisher extends CompositeService {
+  void appCreated(RMApp app, long createdTime);
 
-  private static final Log LOG = LogFactory
-      .getLog(SystemMetricsPublisher.class);
+  void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime);
 
-  private Dispatcher dispatcher;
-  private boolean publishSystemMetrics;
-  private boolean publishContainerMetrics;
-  protected RMContext rmContext;
+  void appUpdated(RMApp app, long updatedTime);
 
-  public SystemMetricsPublisher(RMContext rmContext) {
-    super(SystemMetricsPublisher.class.getName());
-    this.rmContext = rmContext;
-  }
+  void appFinished(RMApp app, RMAppState state, long finishedTime);
 
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    publishSystemMetrics =
-        conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
-    if (publishSystemMetrics) {
-      TimelineServicePublisher timelineServicePublisher =
-          getTimelineServicePublisher(conf);
-      if (timelineServicePublisher != null) {
-        addService(timelineServicePublisher);
-        // init required to be called so that other methods of
-        // TimelineServicePublisher can be utilized
-        timelineServicePublisher.init(conf);
-        dispatcher = createDispatcher(timelineServicePublisher);
-        publishContainerMetrics =
-            timelineServicePublisher.publishRMContainerMetrics();
-        dispatcher.register(SystemMetricsEventType.class,
-            timelineServicePublisher.getEventHandler());
-        addIfService(dispatcher);
-      } else {
-        LOG.info("TimelineServicePublisher is not configured");
-        publishSystemMetrics = false;
-      }
-      LOG.info("YARN system metrics publishing service is enabled");
-    } else {
-      LOG.info("YARN system metrics publishing service is not enabled");
-    }
-    super.serviceInit(conf);
-  }
+  void appAttemptRegistered(RMAppAttempt appAttempt, long registeredTime);
 
-  @VisibleForTesting
-  Dispatcher createDispatcher(TimelineServicePublisher timelineServicePublisher) {
-    return timelineServicePublisher.getDispatcher();
-  }
+  void appAttemptFinished(RMAppAttempt appAttempt,
+      RMAppAttemptState appAttemtpState, RMApp app, long finishedTime);
 
-  TimelineServicePublisher getTimelineServicePublisher(Configuration conf) {
-    if (conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
-        YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
-      return new TimelineServiceV1Publisher();
-    } else if (conf.getBoolean(
-        YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
-        YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED)) {
-      return new TimelineServiceV2Publisher(rmContext);
-    }
-    return null;
-  }
+  void containerCreated(RMContainer container, long createdTime);
 
-  @SuppressWarnings("unchecked")
-  public void appCreated(RMApp app, long createdTime) {
-    if (publishSystemMetrics) {
-      ApplicationSubmissionContext appSubmissionContext =
-          app.getApplicationSubmissionContext();
-      dispatcher.getEventHandler().handle(
-          new ApplicationCreatedEvent(
-              app.getApplicationId(),
-              app.getName(),
-              app.getApplicationType(),
-              app.getUser(),
-              app.getQueue(),
-              app.getSubmitTime(),
-              createdTime, app.getApplicationTags(),
-              appSubmissionContext.getUnmanagedAM(),
-              appSubmissionContext.getPriority(),
-              app.getAppNodeLabelExpression(),
-              app.getAmNodeLabelExpression()));
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public void appUpdated(RMApp app, long updatedTime) {
-    if (publishSystemMetrics) {
-      dispatcher.getEventHandler()
-          .handle(
-              new ApplicationUpdatedEvent(app.getApplicationId(), app
-                  .getQueue(), updatedTime, app
-                  .getApplicationSubmissionContext().getPriority()));
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public void appFinished(RMApp app, RMAppState state, long finishedTime) {
-    if (publishSystemMetrics) {
-      dispatcher.getEventHandler().handle(
-          new ApplicationFinishedEvent(
-              app.getApplicationId(),
-              app.getDiagnostics().toString(),
-              app.getFinalApplicationStatus(),
-              RMServerUtils.createApplicationState(state),
-              app.getCurrentAppAttempt() == null ?
-                  null : app.getCurrentAppAttempt().getAppAttemptId(),
-              finishedTime,
-              app.getRMAppMetrics(),
-              (RMAppImpl)app));
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public void appACLsUpdated(RMApp app, String appViewACLs,
-      long updatedTime) {
-    if (publishSystemMetrics) {
-      dispatcher.getEventHandler().handle(
-          new ApplicationACLsUpdatedEvent(
-              app.getApplicationId(),
-              appViewACLs == null ? "" : appViewACLs,
-              updatedTime));
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public void appAttemptRegistered(RMAppAttempt appAttempt,
-      long registeredTime) {
-    if (publishSystemMetrics) {
-      dispatcher.getEventHandler().handle(
-          new AppAttemptRegisteredEvent(
-              appAttempt.getAppAttemptId(),
-              appAttempt.getHost(),
-              appAttempt.getRpcPort(),
-              appAttempt.getTrackingUrl(),
-              appAttempt.getOriginalTrackingUrl(),
-              appAttempt.getMasterContainer().getId(),
-              registeredTime));
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public void appAttemptFinished(RMAppAttempt appAttempt,
-      RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
-    if (publishSystemMetrics) {
-      dispatcher.getEventHandler().handle(
-          new AppAttemptFinishedEvent(
-              appAttempt.getAppAttemptId(),
-              appAttempt.getTrackingUrl(),
-              appAttempt.getOriginalTrackingUrl(),
-              appAttempt.getDiagnostics(),
-              // app will get the final status from app attempt, or create one
-              // based on app state if it doesn't exist
-              app.getFinalApplicationStatus(),
-              RMServerUtils.createApplicationAttemptState(appAttemtpState),
-              finishedTime));
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public void containerCreated(RMContainer container, long createdTime) {
-    if (publishContainerMetrics) {
-      dispatcher.getEventHandler().handle(
-          new ContainerCreatedEvent(
-              container.getContainerId(),
-              container.getAllocatedResource(),
-              container.getAllocatedNode(),
-              container.getAllocatedPriority(),
-              createdTime, container.getNodeHttpAddress()));
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  public void containerFinished(RMContainer container, long finishedTime) {
-    if (publishContainerMetrics) {
-      dispatcher.getEventHandler().handle(
-          new ContainerFinishedEvent(
-              container.getContainerId(),
-              container.getDiagnosticsInfo(),
-              container.getContainerExitStatus(),
-              container.getContainerState(),
-              finishedTime));
-    }
-  }
-
-  @VisibleForTesting
-  boolean isPublishContainerMetrics() {
-    return publishContainerMetrics;
-  }
-
-  @VisibleForTesting
-  Dispatcher getDispatcher() {
-    return dispatcher;
-  }
-
-  interface TimelineServicePublisher extends Service {
-    /**
-     * @return the Dispatcher which needs to be used to dispatch events
-     */
-    Dispatcher getDispatcher();
-
-    /**
-     * @return true if RMContainerMetricsNeeds to be sent
-     */
-    boolean publishRMContainerMetrics();
-
-    /**
-     * @return EventHandler which needs to be registered to the dispatcher to
-     *         handle the SystemMetricsEvent
-     */
-    EventHandler<SystemMetricsEvent> getEventHandler();
-  }
+  void containerFinished(RMContainer container, long finishedTime);
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
index 0dd1bca..a236e4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
@@ -30,17 +30,23 @@
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
-public class TimelineServiceV1Publisher extends
-    AbstractTimelineServicePublisher {
+public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
 
-  private static final Log LOG = LogFactory
-      .getLog(TimelineServiceV1Publisher.class);
+  private static final Log LOG =
+      LogFactory.getLog(TimelineServiceV1Publisher.class);
 
   public TimelineServiceV1Publisher() {
     super("TimelineserviceV1Publisher");
@@ -49,62 +55,70 @@
   private TimelineClient client;
 
   @Override
-  public void serviceInit(Configuration conf) throws Exception {
+  protected void serviceInit(Configuration conf) throws Exception {
     client = TimelineClient.createTimelineClient();
     addIfService(client);
     super.serviceInit(conf);
+    getDispatcher().register(SystemMetricsEventType.class,
+        new TimelineV1EventHandler());
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
-    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+  public void appCreated(RMApp app, long createdTime) {
+    TimelineEntity entity = createApplicationEntity(app.getApplicationId());
     Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
-        event.getApplicationName());
+    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, app.getName());
     entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
-        event.getApplicationType());
-    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
-        event.getUser());
+        app.getApplicationType());
+    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, app.getUser());
     entityInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
-        event.getQueue());
+        app.getQueue());
     entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
-        event.getSubmittedTime());
+        app.getSubmitTime());
     entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
-        event.getAppTags());
+        app.getApplicationTags());
     entityInfo.put(
         ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
-        event.isUnmanagedApp());
+        app.getApplicationSubmissionContext().getUnmanagedAM());
     entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
-        event.getApplicationPriority().getPriority());
+        app.getApplicationSubmissionContext().getPriority().getPriority());
+    entityInfo.put(ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
+        app.getAmNodeLabelExpression());
+    entityInfo.put(ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
+        app.getAppNodeLabelExpression());
     entity.setOtherInfo(entityInfo);
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(createdTime);
 
     entity.addEvent(tEvent);
-    putEntity(entity);
+    getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
   }
 
   @Override
-  void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
-    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+  public void appFinished(RMApp app, RMAppState state, long finishedTime) {
+    TimelineEntity entity = createApplicationEntity(app.getApplicationId());
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(finishedTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
-        .getFinalApplicationStatus().toString());
-    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
-        .getYarnApplicationState().toString());
-    if (event.getLatestApplicationAttemptId() != null) {
+        app.getDiagnostics().toString());
+    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
+        app.getFinalApplicationStatus().toString());
+    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
+        RMServerUtils.createApplicationState(state).toString());
+    String latestApplicationAttemptId = app.getCurrentAppAttempt() == null
+        ? null : app.getCurrentAppAttempt().getAppAttemptId().toString();
+    if (latestApplicationAttemptId != null) {
       eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
-          event.getLatestApplicationAttemptId().toString());
+          latestApplicationAttemptId);
     }
-    RMAppMetrics appMetrics = event.getAppMetrics();
+    RMAppMetrics appMetrics = app.getRMAppMetrics();
     entity.addOtherInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
         appMetrics.getVcoreSeconds());
     entity.addOtherInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
@@ -112,39 +126,157 @@
     tEvent.setEventInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity);
+    // sync sending of finish event to avoid possibility of saving application
+    // finished state in RMStateStore save without publishing in ATS
+    putEntity(entity);// sync event so that ATS update is done without fail
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) {
-    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+  public void appUpdated(RMApp app, long updatedTime) {
+    TimelineEntity entity = createApplicationEntity(app.getApplicationId());
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
-        event.getQueue());
-    eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event
-        .getApplicationPriority().getPriority());
+        app.getQueue());
+    eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
+        app.getApplicationSubmissionContext().getPriority().getPriority());
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(updatedTime);
     tEvent.setEventInfo(eventInfo);
     entity.addEvent(tEvent);
-    putEntity(entity);
+    getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
-    TimelineEntity entity = createApplicationEntity(event.getApplicationId());
+  public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
+    TimelineEntity entity = createApplicationEntity(app.getApplicationId());
 
     TimelineEvent tEvent = new TimelineEvent();
     Map<String, Object> entityInfo = new HashMap<String, Object>();
     entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
-        event.getViewAppACLs());
+        (appViewACLs == null) ? "" : appViewACLs);
     entity.setOtherInfo(entityInfo);
     tEvent.setEventType(ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(updatedTime);
 
     entity.addEvent(tEvent);
-    putEntity(entity);
+    getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void appAttemptRegistered(RMAppAttempt appAttempt,
+      long registeredTime) {
+    TimelineEntity entity =
+        createAppAttemptEntity(appAttempt.getAppAttemptId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
+    tEvent.setTimestamp(registeredTime);
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+        appAttempt.getTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+        appAttempt.getOriginalTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
+        appAttempt.getHost());
+    eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
+        appAttempt.getRpcPort());
+    eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
+        appAttempt.getMasterContainer().getId().toString());
+    tEvent.setEventInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    getDispatcher().getEventHandler().handle(
+        new TimelineV1PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
+            entity, appAttempt.getAppAttemptId().getApplicationId()));
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void appAttemptFinished(RMAppAttempt appAttempt,
+      RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
+    TimelineEntity entity =
+        createAppAttemptEntity(appAttempt.getAppAttemptId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
+    tEvent.setTimestamp(finishedTime);
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
+        appAttempt.getTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
+        appAttempt.getOriginalTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        appAttempt.getDiagnostics());
+    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
+        app.getFinalApplicationStatus().toString());
+    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils
+        .createApplicationAttemptState(appAttemtpState).toString());
+    tEvent.setEventInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    getDispatcher().getEventHandler().handle(
+        new TimelineV1PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
+            entity, appAttempt.getAppAttemptId().getApplicationId()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void containerCreated(RMContainer container, long createdTime) {
+    TimelineEntity entity = createContainerEntity(container.getContainerId());
+    Map<String, Object> entityInfo = new HashMap<String, Object>();
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
+        container.getAllocatedResource().getMemory());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
+        container.getAllocatedResource().getVirtualCores());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
+        container.getAllocatedNode().getHost());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
+        container.getAllocatedNode().getPort());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
+        container.getAllocatedPriority().getPriority());
+    entityInfo.put(
+        ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
+        container.getNodeHttpAddress());
+    entity.setOtherInfo(entityInfo);
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
+    tEvent.setTimestamp(createdTime);
+
+    entity.addEvent(tEvent);
+    getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, container
+            .getContainerId().getApplicationAttemptId().getApplicationId()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void containerFinished(RMContainer container, long finishedTime) {
+    TimelineEntity entity = createContainerEntity(container.getContainerId());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
+    tEvent.setTimestamp(finishedTime);
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        container.getDiagnosticsInfo());
+    eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
+        container.getContainerExitStatus());
+    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
+        container.getContainerState().toString());
+    tEvent.setEventInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    getDispatcher().getEventHandler().handle(new TimelineV1PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, container
+            .getContainerId().getApplicationAttemptId().getApplicationId()));
   }
 
   private static TimelineEntity createApplicationEntity(
@@ -155,55 +287,6 @@
     return entity;
   }
 
-  @Override
-  void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
-    TimelineEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
-
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
-    eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
-    eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, event.getHost());
-    eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
-        event.getRpcPort());
-    eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, event
-        .getMasterContainerId().toString());
-    tEvent.setEventInfo(eventInfo);
-
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  @Override
-  void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
-    TimelineEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
-
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
-    eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
-    eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
-        .getFinalApplicationStatus().toString());
-    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
-        .getYarnApplicationAttemptState().toString());
-    tEvent.setEventInfo(eventInfo);
-
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
   private static TimelineEntity createAppAttemptEntity(
       ApplicationAttemptId appAttemptId) {
     TimelineEntity entity = new TimelineEntity();
@@ -214,53 +297,6 @@
     return entity;
   }
 
-  @Override
-  void publishContainerCreatedEvent(ContainerCreatedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
-    Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
-        event.getAllocatedResource().getMemory());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
-        .getAllocatedResource().getVirtualCores());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
-        .getAllocatedNode().getHost());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
-        .getAllocatedNode().getPort());
-    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
-        event.getAllocatedPriority().getPriority());
-    entityInfo.put(
-        ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
-        event.getNodeHttpAddress());
-    entity.setOtherInfo(entityInfo);
-
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
-  @Override
-  void publishContainerFinishedEvent(ContainerFinishedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
-
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setEventType(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
-        event.getContainerExitStatus());
-    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
-        .getContainerState().toString());
-    tEvent.setEventInfo(eventInfo);
-
-    entity.addEvent(tEvent);
-    putEntity(entity);
-  }
-
   private static TimelineEntity createContainerEntity(ContainerId containerId) {
     TimelineEntity entity = new TimelineEntity();
     entity.setEntityType(ContainerMetricsConstants.ENTITY_TYPE);
@@ -283,4 +319,26 @@
           + entity.getEntityId() + "]", e);
     }
   }
+
+  private class TimelineV1PublishEvent extends TimelinePublishEvent {
+    private TimelineEntity entity;
+
+    public TimelineV1PublishEvent(SystemMetricsEventType type,
+        TimelineEntity entity, ApplicationId appId) {
+      super(type, appId);
+      this.entity = entity;
+    }
+
+    public TimelineEntity getEntity() {
+      return entity;
+    }
+  }
+
+  private class TimelineV1EventHandler
+      implements EventHandler<TimelineV1PublishEvent> {
+    @Override
+    public void handle(TimelineV1PublishEvent event) {
+      putEntity(event.getEntity());
+    }
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
index 1a96e61..e0c593d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV2Publisher.java
@@ -25,7 +25,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -39,78 +38,100 @@
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class is responsible for posting application, appattempt & Container
  * lifecycle related events to timeline service V2
  */
 @Private
 @Unstable
-public class TimelineServiceV2Publisher extends
-    AbstractTimelineServicePublisher {
-  private static final Log LOG = LogFactory
-      .getLog(TimelineServiceV2Publisher.class);
+public class TimelineServiceV2Publisher extends AbstractSystemMetricsPublisher {
+  private static final Log LOG =
+      LogFactory.getLog(TimelineServiceV2Publisher.class);
   protected RMTimelineCollectorManager rmTimelineCollectorManager;
+  private boolean publishContainerMetrics;
 
   public TimelineServiceV2Publisher(RMContext rmContext) {
     super("TimelineserviceV2Publisher");
     rmTimelineCollectorManager = rmContext.getRMTimelineCollectorManager();
   }
 
-  private boolean publishContainerMetrics;
-
   @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    publishContainerMetrics =
-        conf.getBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
-            YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED);
-    super.serviceInit(conf);
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    publishContainerMetrics = getConfig().getBoolean(
+        YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
+        YarnConfiguration.DEFAULT_RM_PUBLISH_CONTAINER_METRICS_ENABLED);
+    getDispatcher().register(SystemMetricsEventType.class,
+        new TimelineV2EventHandler());
   }
 
+  @VisibleForTesting
+  boolean isPublishContainerMetrics() {
+    return publishContainerMetrics;
+  }
+
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationCreatedEvent(ApplicationCreatedEvent event) {
-    ApplicationEntity entity =
-        createApplicationEntity(event.getApplicationId());
-    entity.setQueue(event.getQueue());
+  public void appCreated(RMApp app, long createdTime) {
+    ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
+    entity.setQueue(app.getQueue());
+    entity.setCreatedTime(createdTime);
+
     Map<String, Object> entityInfo = new HashMap<String, Object>();
-    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO,
-        event.getApplicationName());
+    entityInfo.put(ApplicationMetricsConstants.NAME_ENTITY_INFO, app.getName());
     entityInfo.put(ApplicationMetricsConstants.TYPE_ENTITY_INFO,
-        event.getApplicationType());
-    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO,
-        event.getUser());
+        app.getApplicationType());
+    entityInfo.put(ApplicationMetricsConstants.USER_ENTITY_INFO, app.getUser());
     entityInfo.put(ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO,
-        event.getSubmittedTime());
+        app.getSubmitTime());
     entityInfo.put(ApplicationMetricsConstants.APP_TAGS_INFO,
-        event.getAppTags());
+        app.getApplicationTags());
     entityInfo.put(
-      ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
-      event.isUnmanagedApp());
+        ApplicationMetricsConstants.UNMANAGED_APPLICATION_ENTITY_INFO,
+        app.getApplicationSubmissionContext().getUnmanagedAM());
     entityInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
-      event.getApplicationPriority().getPriority());
+        app.getApplicationSubmissionContext().getPriority().getPriority());
+    entity.getConfigs().put(
+        ApplicationMetricsConstants.AM_NODE_LABEL_EXPRESSION,
+        app.getAmNodeLabelExpression());
+    entity.getConfigs().put(
+        ApplicationMetricsConstants.APP_NODE_LABEL_EXPRESSION,
+        app.getAppNodeLabelExpression());
     entity.setInfo(entityInfo);
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setId(ApplicationMetricsConstants.CREATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(createdTime);
     entity.addEvent(tEvent);
 
-    putEntity(entity, event.getApplicationId());
+    getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationFinishedEvent(ApplicationFinishedEvent event) {
-    ApplicationEntity entity =
-        createApplicationEntity(event.getApplicationId());
-    RMAppMetrics appMetrics = event.getAppMetrics();
+  public void appFinished(RMApp app, RMAppState state, long finishedTime) {
+    ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
+    RMAppMetrics appMetrics = app.getRMAppMetrics();
     entity.addInfo(ApplicationMetricsConstants.APP_CPU_METRICS,
         appMetrics.getVcoreSeconds());
     entity.addInfo(ApplicationMetricsConstants.APP_MEM_METRICS,
@@ -118,54 +139,57 @@
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setId(ApplicationMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(finishedTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO, event
-        .getFinalApplicationStatus().toString());
-    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO, event
-        .getYarnApplicationState().toString());
-    if (event.getLatestApplicationAttemptId() != null) {
+        app.getDiagnostics().toString());
+    eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
+        app.getFinalApplicationStatus().toString());
+    eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
+        RMServerUtils.createApplicationState(state).toString());
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() == null
+        ? null : app.getCurrentAppAttempt().getAppAttemptId();
+    if (appAttemptId != null) {
       eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
-          event.getLatestApplicationAttemptId().toString());
+          appAttemptId.toString());
     }
     tEvent.setInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity, event.getApplicationId());
 
-    //cleaning up the collector cached
-    event.getApp().stopTimelineCollector();
+    getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishApplicationUpdatedEvent(ApplicationUpdatedEvent event) {
-    ApplicationEntity entity =
-        createApplicationEntity(event.getApplicationId());
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
-        event.getQueue());
-    eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO, event
-        .getApplicationPriority().getPriority());
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    tEvent.setInfo(eventInfo);
-    entity.addEvent(tEvent);
-    putEntity(entity, event.getApplicationId());
-  }
-
-  @Override
-  void publishApplicationACLsUpdatedEvent(ApplicationACLsUpdatedEvent event) {
-    ApplicationEntity entity =
-        createApplicationEntity(event.getApplicationId());
+  public void appACLsUpdated(RMApp app, String appViewACLs, long updatedTime) {
+    ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
     Map<String, Object> entityInfo = new HashMap<String, Object>();
     entityInfo.put(ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO,
-        event.getViewAppACLs());
+        appViewACLs);
     entity.setInfo(entityInfo);
 
-    putEntity(entity, event.getApplicationId());
+    getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void appUpdated(RMApp app, long currentTimeMillis) {
+    ApplicationEntity entity = createApplicationEntity(app.getApplicationId());
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(ApplicationMetricsConstants.QUEUE_ENTITY_INFO,
+        app.getQueue());
+    eventInfo.put(ApplicationMetricsConstants.APPLICATION_PRIORITY_INFO,
+        app.getApplicationSubmissionContext().getPriority().getPriority());
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(ApplicationMetricsConstants.UPDATED_EVENT_TYPE);
+    tEvent.setTimestamp(currentTimeMillis);
+    tEvent.setInfo(eventInfo);
+    entity.addEvent(tEvent);
+    getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+        SystemMetricsEventType.PUBLISH_ENTITY, entity, app.getApplicationId()));
   }
 
   private static ApplicationEntity createApplicationEntity(
@@ -175,104 +199,135 @@
     return entity;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishAppAttemptRegisteredEvent(AppAttemptRegisteredEvent event) {
+  public void appAttemptRegistered(RMAppAttempt appAttempt,
+      long registeredTime) {
     TimelineEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
+        createAppAttemptEntity(appAttempt.getAppAttemptId());
+    entity.setCreatedTime(registeredTime);
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setId(AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(registeredTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
+        appAttempt.getTrackingUrl());
     eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
-    eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO, event.getHost());
+        appAttempt.getOriginalTrackingUrl());
+    eventInfo.put(AppAttemptMetricsConstants.HOST_EVENT_INFO,
+        appAttempt.getHost());
     eventInfo.put(AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO,
-        event.getRpcPort());
-    eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO, event
-        .getMasterContainerId().toString());
+        appAttempt.getRpcPort());
+    eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO,
+        appAttempt.getMasterContainer().getId().toString());
     tEvent.setInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity, event.getApplicationAttemptId().getApplicationId());
+    getDispatcher().getEventHandler().handle(
+        new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
+            entity, appAttempt.getAppAttemptId().getApplicationId()));
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishAppAttemptFinishedEvent(AppAttemptFinishedEvent event) {
+  public void appAttemptFinished(RMAppAttempt appAttempt,
+      RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
+
     ApplicationAttemptEntity entity =
-        createAppAttemptEntity(event.getApplicationAttemptId());
+        createAppAttemptEntity(appAttempt.getAppAttemptId());
 
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setId(AppAttemptMetricsConstants.FINISHED_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
+    tEvent.setTimestamp(finishedTime);
     Map<String, Object> eventInfo = new HashMap<String, Object>();
     eventInfo.put(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO,
-        event.getTrackingUrl());
+        appAttempt.getTrackingUrl());
     eventInfo.put(AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO,
-        event.getOriginalTrackingURL());
+        appAttempt.getOriginalTrackingUrl());
     eventInfo.put(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO, event
-        .getFinalApplicationStatus().toString());
-    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, event
-        .getYarnApplicationAttemptState().toString());
+        appAttempt.getDiagnostics());
+    // app will get the final status from app attempt, or create one
+    // based on app state if it doesn't exist
+    eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO,
+        app.getFinalApplicationStatus().toString());
+    eventInfo.put(AppAttemptMetricsConstants.STATE_EVENT_INFO, RMServerUtils
+        .createApplicationAttemptState(appAttemtpState).toString());
     tEvent.setInfo(eventInfo);
 
     entity.addEvent(tEvent);
-    putEntity(entity, event.getApplicationAttemptId().getApplicationId());
+    getDispatcher().getEventHandler().handle(
+        new TimelineV2PublishEvent(SystemMetricsEventType.PUBLISH_ENTITY,
+            entity, appAttempt.getAppAttemptId().getApplicationId()));
   }
 
-  @Override
-  void publishContainerCreatedEvent(ContainerCreatedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
-
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    // updated as event info instead of entity info, as entity info is updated
-    // by NM
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO, event
-        .getAllocatedResource().getMemory());
-    eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO, event
-        .getAllocatedResource().getVirtualCores());
-    eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO, event
-        .getAllocatedNode().getHost());
-    eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, event
-        .getAllocatedNode().getPort());
-    eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
-        event.getAllocatedPriority().getPriority());
-    eventInfo.put(
-        ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
-        event.getNodeHttpAddress());
-    tEvent.setInfo(eventInfo);
-
-    entity.addEvent(tEvent);
-    putEntity(entity, event.getContainerId().getApplicationAttemptId()
-        .getApplicationId());
+  private static ApplicationAttemptEntity createAppAttemptEntity(
+      ApplicationAttemptId appAttemptId) {
+    ApplicationAttemptEntity entity = new ApplicationAttemptEntity();
+    entity.setId(appAttemptId.toString());
+    entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(),
+        appAttemptId.getApplicationId().toString()));
+    return entity;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  void publishContainerFinishedEvent(ContainerFinishedEvent event) {
-    TimelineEntity entity = createContainerEntity(event.getContainerId());
+  public void containerCreated(RMContainer container, long createdTime) {
+    if (publishContainerMetrics) {
+      TimelineEntity entity = createContainerEntity(container.getContainerId());
+      entity.setCreatedTime(createdTime);
 
-    TimelineEvent tEvent = new TimelineEvent();
-    tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE);
-    tEvent.setTimestamp(event.getTimestamp());
-    Map<String, Object> eventInfo = new HashMap<String, Object>();
-    eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
-        event.getDiagnosticsInfo());
-    eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
-        event.getContainerExitStatus());
-    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, event
-        .getContainerState().toString());
-    tEvent.setInfo(eventInfo);
+      TimelineEvent tEvent = new TimelineEvent();
+      tEvent.setId(ContainerMetricsConstants.CREATED_IN_RM_EVENT_TYPE);
+      tEvent.setTimestamp(createdTime);
+      // updated as event info instead of entity info, as entity info is updated
+      // by NM
+      Map<String, Object> eventInfo = new HashMap<String, Object>();
+      eventInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
+          container.getAllocatedResource().getMemory());
+      eventInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
+          container.getAllocatedResource().getVirtualCores());
+      eventInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
+          container.getAllocatedNode().getHost());
+      eventInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
+          container.getAllocatedNode().getPort());
+      eventInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
+          container.getAllocatedPriority().getPriority());
+      eventInfo.put(
+          ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
+          container.getNodeHttpAddress());
+      tEvent.setInfo(eventInfo);
 
-    entity.addEvent(tEvent);
-    putEntity(entity, event.getContainerId().getApplicationAttemptId()
-        .getApplicationId());
+      entity.addEvent(tEvent);
+      getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+          SystemMetricsEventType.PUBLISH_ENTITY, entity, container
+              .getContainerId().getApplicationAttemptId().getApplicationId()));
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void containerFinished(RMContainer container, long finishedTime) {
+    if (publishContainerMetrics) {
+      TimelineEntity entity = createContainerEntity(container.getContainerId());
+
+      TimelineEvent tEvent = new TimelineEvent();
+      tEvent.setId(ContainerMetricsConstants.FINISHED_IN_RM_EVENT_TYPE);
+      tEvent.setTimestamp(finishedTime);
+      Map<String, Object> eventInfo = new HashMap<String, Object>();
+      eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+          container.getDiagnosticsInfo());
+      eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
+          container.getContainerExitStatus());
+      eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO,
+          container.getContainerState().toString());
+      tEvent.setInfo(eventInfo);
+
+      entity.addEvent(tEvent);
+      getDispatcher().getEventHandler().handle(new TimelineV2PublishEvent(
+          SystemMetricsEventType.PUBLISH_ENTITY, entity, container
+              .getContainerId().getApplicationAttemptId().getApplicationId()));
+    }
   }
 
   private static ContainerEntity createContainerEntity(ContainerId containerId) {
@@ -300,17 +355,48 @@
     }
   }
 
-  private static ApplicationAttemptEntity createAppAttemptEntity(
-      ApplicationAttemptId appAttemptId) {
-    ApplicationAttemptEntity entity = new ApplicationAttemptEntity();
-    entity.setId(appAttemptId.toString());
-    entity.setParent(new Identifier(TimelineEntityType.YARN_APPLICATION.name(),
-        appAttemptId.getApplicationId().toString()));
-    return entity;
+  private class ApplicationFinishPublishEvent extends TimelineV2PublishEvent {
+    private RMAppImpl app;
+
+    public ApplicationFinishPublishEvent(SystemMetricsEventType type,
+        TimelineEntity entity, RMAppImpl app) {
+      super(type, entity, app.getApplicationId());
+      this.app = app;
+    }
+
+    public RMAppImpl getRMAppImpl() {
+      return app;
+    }
   }
 
-  @Override
-  public boolean publishRMContainerMetrics() {
-    return publishContainerMetrics;
+  private class TimelineV2EventHandler
+      implements EventHandler<TimelineV2PublishEvent> {
+    @Override
+    public void handle(TimelineV2PublishEvent event) {
+      switch (event.getType()) {
+      case PUBLISH_APPLICATION_FINISHED_ENTITY:
+        putEntity(event.getEntity(), event.getApplicationId());
+        ((ApplicationFinishPublishEvent) event).getRMAppImpl()
+            .stopTimelineCollector();
+        break;
+      default:
+        putEntity(event.getEntity(), event.getApplicationId());
+        break;
+      }
+    }
+  }
+
+  private class TimelineV2PublishEvent extends TimelinePublishEvent {
+    private TimelineEntity entity;
+
+    public TimelineV2PublishEvent(SystemMetricsEventType type,
+        TimelineEntity entity, ApplicationId appId) {
+      super(type, appId);
+      this.entity = entity;
+    }
+
+    public TimelineEntity getEntity() {
+      return entity;
+    }
   }
 }
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
index 0418f37..d6fdb3e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
@@ -47,7 +47,6 @@
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -68,7 +67,7 @@
 public class TestSystemMetricsPublisher {
 
   private static ApplicationHistoryServer timelineServer;
-  private static SystemMetricsPublisher metricsPublisher;
+  private static TimelineServiceV1Publisher metricsPublisher;
   private static TimelineStore store;
 
   @BeforeClass
@@ -89,7 +88,7 @@
     timelineServer.start();
     store = timelineServer.getTimelineStore();
 
-    metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class));
+    metricsPublisher = new TimelineServiceV1Publisher();
     metricsPublisher.init(conf);
     metricsPublisher.start();
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
index ac20335..20a5b13 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
@@ -49,7 +49,6 @@
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher.MultiThreadedDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
@@ -75,7 +74,7 @@
       TestSystemMetricsPublisherForV2.class.getName() + "-localDir")
       .getAbsoluteFile();
 
-  private static SystemMetricsPublisher metricsPublisher;
+  private static TimelineServiceV2Publisher metricsPublisher;
   private static DrainDispatcher dispatcher = new DrainDispatcher();
   private static final String DEFAULT_FLOW_VERSION = "1";
   private static final long DEFAULT_FLOW_RUN = 1;
@@ -103,10 +102,11 @@
     rmTimelineCollectorManager.init(conf);
     rmTimelineCollectorManager.start();
 
-    metricsPublisher = new SystemMetricsPublisher(rmContext) {
+    dispatcher.init(conf);
+    dispatcher.start();
+    metricsPublisher = new TimelineServiceV2Publisher(rmContext) {
       @Override
-      Dispatcher createDispatcher(
-          TimelineServicePublisher timelineServicePublisher) {
+      protected Dispatcher getDispatcher() {
         return dispatcher;
       }
     };
@@ -150,8 +150,8 @@
   @Test
   public void testSystemMetricPublisherInitialization() {
     @SuppressWarnings("resource")
-    SystemMetricsPublisher metricsPublisher =
-        new SystemMetricsPublisher(mock(RMContext.class));
+    TimelineServiceV2Publisher metricsPublisher =
+        new TimelineServiceV2Publisher(mock(RMContext.class));
     try {
       Configuration conf = getTimelineV2Conf();
       conf.setBoolean(YarnConfiguration.RM_PUBLISH_CONTAINER_METRICS_ENABLED,
@@ -163,20 +163,18 @@
 
       metricsPublisher.stop();
 
-      metricsPublisher = new SystemMetricsPublisher(mock(RMContext.class));
+      metricsPublisher = new TimelineServiceV2Publisher(mock(RMContext.class));
       conf = getTimelineV2Conf();
       metricsPublisher.init(conf);
+      metricsPublisher.start();
       assertTrue("Expected to publish container Metrics from RM",
           metricsPublisher.isPublishContainerMetrics());
-      assertTrue(
-          "MultiThreadedDispatcher expected when container Metrics is not published",
-          metricsPublisher.getDispatcher() instanceof MultiThreadedDispatcher);
     } finally {
       metricsPublisher.stop();
     }
   }
 
-  @Test(timeout = 1000000)
+  @Test(timeout = 10000)
   public void testPublishApplicationMetrics() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(0, 1);
     RMApp app = createAppAndRegister(appId);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index 7637410..a642a78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -43,7 +43,6 @@
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;