Merge remote-tracking branch 'apache/trunk' into HDDS-48
diff --git a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh
index 3826f67..6db085a 100644
--- a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh
+++ b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh
@@ -88,7 +88,7 @@
 # Extra Java runtime options for all Hadoop commands. We don't support
 # IPv6 yet/still, so by default the preference is set to IPv4.
 # export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true"
-# For Kerberos debugging, an extended option set logs more invormation
+# For Kerberos debugging, an extended option set logs more information
 # export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug"
 
 # Some parts of the shell code may do special things dependent upon
@@ -120,9 +120,9 @@
 #
 # By default, Apache Hadoop overrides Java's CLASSPATH
 # environment variable.  It is configured such
-# that it sarts out blank with new entries added after passing
+# that it starts out blank with new entries added after passing
 # a series of checks (file/dir exists, not already listed aka
-# de-deduplication).  During de-depulication, wildcards and/or
+# de-deduplication).  During de-deduplication, wildcards and/or
 # directories are *NOT* expanded to keep it simple. Therefore,
 # if the computed classpath has two specific mentions of
 # awesome-methods-1.0.jar, only the first one added will be seen.
diff --git a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-metrics2.properties b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-metrics2.properties
index 16fdcf0..f061313 100644
--- a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-metrics2.properties
+++ b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-metrics2.properties
@@ -47,7 +47,7 @@
 #*.sink.ganglia.dmax=jvm.metrics.threadsBlocked=70,jvm.metrics.memHeapUsedM=40
 
 # Tag values to use for the ganglia prefix. If not defined no tags are used.
-# If '*' all tags are used. If specifiying multiple tags separate them with 
+# If '*' all tags are used. If specifying multiple tags separate them with
 # commas. Note that the last segment of the property name is the context name.
 #
 # A typical use of tags is separating the metrics by the HDFS rpc port
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
index 7b46075..11815da 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
@@ -1036,13 +1036,13 @@
           public Token<?> run() throws Exception {
             // Not using the cached token here.. Creating a new token here
             // everytime.
-            LOG.debug("Getting new token from {}, renewer:{}", url, renewer);
+            LOG.info("Getting new token from {}, renewer:{}", url, renewer);
             return authUrl.getDelegationToken(url,
                 new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser);
           }
         });
         if (token != null) {
-          LOG.debug("New token received: ({})", token);
+          LOG.info("New token received: ({})", token);
           credentials.addToken(token.getService(), token);
           tokens = new Token<?>[] { token };
         } else {
diff --git a/hadoop-common-project/hadoop-kms/src/main/conf/kms-log4j.properties b/hadoop-common-project/hadoop-kms/src/main/conf/kms-log4j.properties
index 04a3cf3..e2afd41 100644
--- a/hadoop-common-project/hadoop-kms/src/main/conf/kms-log4j.properties
+++ b/hadoop-common-project/hadoop-kms/src/main/conf/kms-log4j.properties
@@ -37,4 +37,6 @@
 log4j.logger.com.sun.jersey.server.wadl.generators.WadlGeneratorJAXBGrammarGenerator=OFF
 # make zookeeper log level an explicit config, and not changing with rootLogger.
 log4j.logger.org.apache.zookeeper=INFO
-log4j.logger.org.apache.curator=INFO
\ No newline at end of file
+log4j.logger.org.apache.curator=INFO
+# make jetty log level an explicit config, and not changing with rootLogger.
+log4j.logger.org.eclipse.jetty=INFO
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-kms/src/test/resources/log4j.properties b/hadoop-common-project/hadoop-kms/src/test/resources/log4j.properties
index e319af6..b8e6353 100644
--- a/hadoop-common-project/hadoop-kms/src/test/resources/log4j.properties
+++ b/hadoop-common-project/hadoop-kms/src/test/resources/log4j.properties
@@ -31,4 +31,6 @@
 log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
 # make zookeeper log level an explicit config, and not changing with rootLogger.
 log4j.logger.org.apache.zookeeper=INFO
-log4j.logger.org.apache.curator=INFO
\ No newline at end of file
+log4j.logger.org.apache.curator=INFO
+# make jetty log level an explicit config, and not changing with rootLogger.
+log4j.logger.org.eclipse.jetty=INFO
\ No newline at end of file
diff --git a/hadoop-hdds/framework/pom.xml b/hadoop-hdds/framework/pom.xml
index a497133..6e1927d 100644
--- a/hadoop-hdds/framework/pom.xml
+++ b/hadoop-hdds/framework/pom.xml
@@ -39,6 +39,11 @@
       <artifactId>hadoop-hdds-common</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
index 44d85f5..7e29223 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java
@@ -18,7 +18,11 @@
 package org.apache.hadoop.hdds.server.events;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
+
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +46,8 @@
   private static final Logger LOG =
       LoggerFactory.getLogger(EventQueue.class);
 
+  private static final String EXECUTOR_NAME_SEPARATOR = "For";
+
   private final Map<Event, Map<EventExecutor, List<EventHandler>>> executors =
       new HashMap<>();
 
@@ -51,38 +57,74 @@
 
   public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
       EVENT_TYPE event, EventHandler<PAYLOAD> handler) {
-
-    this.addHandler(event, new SingleThreadExecutor<>(
-        event.getName()), handler);
-  }
-
-  public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
-      EVENT_TYPE event,
-      EventExecutor<PAYLOAD> executor,
-      EventHandler<PAYLOAD> handler) {
-
-    executors.putIfAbsent(event, new HashMap<>());
-    executors.get(event).putIfAbsent(executor, new ArrayList<>());
-
-    executors.get(event)
-        .get(executor)
-        .add(handler);
+    this.addHandler(event, handler, generateHandlerName(handler));
   }
 
   /**
-   * Creates one executor with multiple event handlers.
+   * Add new handler to the event queue.
+   * <p>
+   * By default a separated single thread executor will be dedicated to
+   * deliver the events to the registered event handler.
+   *
+   * @param event        Triggering event.
+   * @param handler      Handler of event (will be called from a separated
+   *                     thread)
+   * @param handlerName  The name of handler (should be unique together with
+   *                     the event name)
+   * @param <PAYLOAD>    The type of the event payload.
+   * @param <EVENT_TYPE> The type of the event identifier.
    */
-  public void addHandlerGroup(String name, HandlerForEvent<?>...
-      eventsAndHandlers) {
-    SingleThreadExecutor sharedExecutor =
-        new SingleThreadExecutor(name);
-    for (HandlerForEvent handlerForEvent : eventsAndHandlers) {
-      addHandler(handlerForEvent.event, sharedExecutor,
-          handlerForEvent.handler);
-    }
+  public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
+      EVENT_TYPE event, EventHandler<PAYLOAD> handler, String handlerName) {
+    validateEvent(event);
+    Preconditions.checkNotNull(handler, "Handler name should not be null.");
+    String executorName =
+        StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR
+            + handlerName;
+    this.addHandler(event, new SingleThreadExecutor<>(executorName), handler);
+  }
+
+  private <EVENT_TYPE extends Event<?>> void validateEvent(EVENT_TYPE event) {
+    Preconditions
+        .checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR),
+            "Event name should not contain " + EXECUTOR_NAME_SEPARATOR
+                + " string.");
 
   }
 
+  private <PAYLOAD> String generateHandlerName(EventHandler<PAYLOAD> handler) {
+    if (!"".equals(handler.getClass().getSimpleName())) {
+      return handler.getClass().getSimpleName();
+    } else {
+      return handler.getClass().getName();
+    }
+  }
+
+  /**
+   * Add event handler with custom executor.
+   *
+   * @param event        Triggering event.
+   * @param executor     The executor imlementation to deliver events from a
+   *                     separated threads. Please keep in your mind that
+   *                     registering metrics is the responsibility of the
+   *                     caller.
+   * @param handler      Handler of event (will be called from a separated
+   *                     thread)
+   * @param <PAYLOAD>    The type of the event payload.
+   * @param <EVENT_TYPE> The type of the event identifier.
+   */
+  public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
+      EVENT_TYPE event, EventExecutor<PAYLOAD> executor,
+      EventHandler<PAYLOAD> handler) {
+    validateEvent(event);
+    executors.putIfAbsent(event, new HashMap<>());
+    executors.get(event).putIfAbsent(executor, new ArrayList<>());
+
+    executors.get(event).get(executor).add(handler);
+  }
+
+
+
   /**
    * Route an event with payload to the right listener(s).
    *
@@ -183,31 +225,5 @@
     });
   }
 
-  /**
-   * Event identifier together with the handler.
-   *
-   * @param <PAYLOAD>
-   */
-  public static class HandlerForEvent<PAYLOAD> {
-
-    private final Event<PAYLOAD> event;
-
-    private final EventHandler<PAYLOAD> handler;
-
-    public HandlerForEvent(
-        Event<PAYLOAD> event,
-        EventHandler<PAYLOAD> handler) {
-      this.event = event;
-      this.handler = handler;
-    }
-
-    public Event<PAYLOAD> getEvent() {
-      return event;
-    }
-
-    public EventHandler<PAYLOAD> getHandler() {
-      return handler;
-    }
-  }
 
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
index 19fddde..8c5605a 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
@@ -26,12 +26,17 @@
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.lease.Lease;
 import org.apache.hadoop.ozone.lease.LeaseAlreadyExistException;
 import org.apache.hadoop.ozone.lease.LeaseExpiredException;
 import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.apache.hadoop.ozone.lease.LeaseNotFoundException;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.map.HashedMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,18 +63,39 @@
 
   private final LeaseManager<UUID> leaseManager;
 
+  private final EventWatcherMetrics metrics;
+
+  private final String name;
+
   protected final Map<UUID, TIMEOUT_PAYLOAD> trackedEventsByUUID =
       new ConcurrentHashMap<>();
 
   protected final Set<TIMEOUT_PAYLOAD> trackedEvents = new HashSet<>();
 
-  public EventWatcher(Event<TIMEOUT_PAYLOAD> startEvent,
+  private final Map<UUID, Long> startTrackingTimes = new HashedMap();
+
+  public EventWatcher(String name, Event<TIMEOUT_PAYLOAD> startEvent,
       Event<COMPLETION_PAYLOAD> completionEvent,
       LeaseManager<UUID> leaseManager) {
     this.startEvent = startEvent;
     this.completionEvent = completionEvent;
     this.leaseManager = leaseManager;
+    this.metrics = new EventWatcherMetrics();
+    Preconditions.checkNotNull(name);
+    if (name.equals("")) {
+      name = getClass().getSimpleName();
+    }
+    if (name.equals("")) {
+      //for anonymous inner classes
+      name = getClass().getName();
+    }
+    this.name = name;
+  }
 
+  public EventWatcher(Event<TIMEOUT_PAYLOAD> startEvent,
+      Event<COMPLETION_PAYLOAD> completionEvent,
+      LeaseManager<UUID> leaseManager) {
+    this("", startEvent, completionEvent, leaseManager);
   }
 
   public void start(EventQueue queue) {
@@ -87,11 +113,16 @@
       }
     });
 
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.register(name, "EventWatcher metrics", metrics);
   }
 
   private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload,
       EventPublisher publisher) {
+    metrics.incrementTrackedEvents();
     UUID identifier = payload.getUUID();
+    startTrackingTimes.put(identifier, System.currentTimeMillis());
+
     trackedEventsByUUID.put(identifier, payload);
     trackedEvents.add(payload);
     try {
@@ -112,16 +143,21 @@
 
   private synchronized void handleCompletion(UUID uuid,
       EventPublisher publisher) throws LeaseNotFoundException {
+    metrics.incrementCompletedEvents();
     leaseManager.release(uuid);
     TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(uuid);
     trackedEvents.remove(payload);
+    long originalTime = startTrackingTimes.remove(uuid);
+    metrics.updateFinishingTime(System.currentTimeMillis() - originalTime);
     onFinished(publisher, payload);
   }
 
   private synchronized void handleTimeout(EventPublisher publisher,
       UUID identifier) {
+    metrics.incrementTimedOutEvents();
     TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(identifier);
     trackedEvents.remove(payload);
+    startTrackingTimes.remove(payload.getUUID());
     onTimeout(publisher, payload);
   }
 
@@ -154,4 +190,9 @@
     return trackedEventsByUUID.values().stream().filter(predicate)
         .collect(Collectors.toList());
   }
+
+  @VisibleForTesting
+  protected EventWatcherMetrics getMetrics() {
+    return metrics;
+  }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java
new file mode 100644
index 0000000..1db81a9
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.server.events;
+
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Metrics for any event watcher.
+ */
+public class EventWatcherMetrics {
+
+  @Metric()
+  private MutableCounterLong trackedEvents;
+
+  @Metric()
+  private MutableCounterLong timedOutEvents;
+
+  @Metric()
+  private MutableCounterLong completedEvents;
+
+  @Metric()
+  private MutableRate completionTime;
+
+  public void incrementTrackedEvents() {
+    trackedEvents.incr();
+  }
+
+  public void incrementTimedOutEvents() {
+    timedOutEvents.incr();
+  }
+
+  public void incrementCompletedEvents() {
+    completedEvents.incr();
+  }
+
+  @VisibleForTesting
+  public void updateFinishingTime(long duration) {
+    completionTime.add(duration);
+  }
+
+  @VisibleForTesting
+  public MutableCounterLong getTrackedEvents() {
+    return trackedEvents;
+  }
+
+  @VisibleForTesting
+  public MutableCounterLong getTimedOutEvents() {
+    return timedOutEvents;
+  }
+
+  @VisibleForTesting
+  public MutableCounterLong getCompletedEvents() {
+    return completedEvents;
+  }
+
+  @VisibleForTesting
+  public MutableRate getCompletionTime() {
+    return completionTime;
+  }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
index a64e3d7..3253f2d 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java
@@ -23,13 +23,18 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 
 /**
  * Simple EventExecutor to call all the event handler one-by-one.
  *
  * @param <T>
  */
+@Metrics(context = "EventQueue")
 public class SingleThreadExecutor<T> implements EventExecutor<T> {
 
   public static final String THREAD_NAME_PREFIX = "EventQueue";
@@ -41,14 +46,24 @@
 
   private final ThreadPoolExecutor executor;
 
-  private final AtomicLong queuedCount = new AtomicLong(0);
+  @Metric
+  private MutableCounterLong queued;
 
-  private final AtomicLong successfulCount = new AtomicLong(0);
+  @Metric
+  private MutableCounterLong done;
 
-  private final AtomicLong failedCount = new AtomicLong(0);
+  @Metric
+  private MutableCounterLong failed;
 
+  /**
+   * Create SingleThreadExecutor.
+   *
+   * @param name Unique name used in monitoring and metrics.
+   */
   public SingleThreadExecutor(String name) {
     this.name = name;
+    DefaultMetricsSystem.instance()
+        .register("EventQueue" + name, "Event Executor metrics ", this);
 
     LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
     executor =
@@ -64,31 +79,31 @@
   @Override
   public void onMessage(EventHandler<T> handler, T message, EventPublisher
       publisher) {
-    queuedCount.incrementAndGet();
+    queued.incr();
     executor.execute(() -> {
       try {
         handler.onMessage(message, publisher);
-        successfulCount.incrementAndGet();
+        done.incr();
       } catch (Exception ex) {
         LOG.error("Error on execution message {}", message, ex);
-        failedCount.incrementAndGet();
+        failed.incr();
       }
     });
   }
 
   @Override
   public long failedEvents() {
-    return failedCount.get();
+    return failed.value();
   }
 
   @Override
   public long successfulEvents() {
-    return successfulCount.get();
+    return done.value();
   }
 
   @Override
   public long queuedEvents() {
-    return queuedCount.get();
+    return queued.value();
   }
 
   @Override
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
index 3944409..2bdf705 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java
@@ -25,6 +25,8 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+
 /**
  * Testing the basic functionality of the event queue.
  */
@@ -44,11 +46,13 @@
 
   @Before
   public void startEventQueue() {
+    DefaultMetricsSystem.initialize(getClass().getSimpleName());
     queue = new EventQueue();
   }
 
   @After
   public void stopEventQueue() {
+    DefaultMetricsSystem.shutdown();
     queue.close();
   }
 
@@ -79,35 +83,4 @@
 
   }
 
-  @Test
-  public void handlerGroup() {
-    final long[] result = new long[2];
-    queue.addHandlerGroup(
-        "group",
-        new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) ->
-            result[0] = payload),
-        new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) ->
-            result[1] = payload)
-    );
-
-    queue.fireEvent(EVENT3, 23L);
-    queue.fireEvent(EVENT4, 42L);
-
-    queue.processAll(1000);
-
-    Assert.assertEquals(23, result[0]);
-    Assert.assertEquals(42, result[1]);
-
-    Set<String> eventQueueThreadNames =
-        Thread.getAllStackTraces().keySet()
-            .stream()
-            .filter(t -> t.getName().startsWith(SingleThreadExecutor
-                .THREAD_NAME_PREFIX))
-            .map(Thread::getName)
-            .collect(Collectors.toSet());
-    System.out.println(eventQueueThreadNames);
-    Assert.assertEquals(1, eventQueueThreadNames.size());
-
-  }
-
 }
\ No newline at end of file
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
index 1731350..38e1554 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
@@ -21,8 +21,13 @@
 import java.util.Objects;
 import java.util.UUID;
 
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.test.MetricsAsserts;
 
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -46,6 +51,7 @@
 
   @Before
   public void startLeaseManager() {
+    DefaultMetricsSystem.instance();
     leaseManager = new LeaseManager<>(2000l);
     leaseManager.start();
   }
@@ -53,12 +59,12 @@
   @After
   public void stopLeaseManager() {
     leaseManager.shutdown();
+    DefaultMetricsSystem.shutdown();
   }
 
 
   @Test
   public void testEventHandling() throws InterruptedException {
-
     EventQueue queue = new EventQueue();
 
     EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>
@@ -139,26 +145,101 @@
     Assert.assertEquals(0, c1todo.size());
     Assert.assertFalse(replicationWatcher.contains(event1));
 
+  }
 
+  @Test
+  public void testMetrics() throws InterruptedException {
+
+    DefaultMetricsSystem.initialize("test");
+
+    EventQueue queue = new EventQueue();
+
+    EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>
+        replicationWatcher = createEventWatcher();
+
+    EventHandlerStub<UnderreplicatedEvent> underReplicatedEvents =
+        new EventHandlerStub<>();
+
+    queue.addHandler(UNDER_REPLICATED, underReplicatedEvents);
+
+    replicationWatcher.start(queue);
+
+    //send 3 event to track 3 in-progress activity
+    UnderreplicatedEvent event1 =
+        new UnderreplicatedEvent(UUID.randomUUID(), "C1");
+
+    UnderreplicatedEvent event2 =
+        new UnderreplicatedEvent(UUID.randomUUID(), "C2");
+
+    UnderreplicatedEvent event3 =
+        new UnderreplicatedEvent(UUID.randomUUID(), "C1");
+
+    queue.fireEvent(WATCH_UNDER_REPLICATED, event1);
+
+    queue.fireEvent(WATCH_UNDER_REPLICATED, event2);
+
+    queue.fireEvent(WATCH_UNDER_REPLICATED, event3);
+
+    //1st event is completed, don't need to track any more
+    ReplicationCompletedEvent event1Completed =
+        new ReplicationCompletedEvent(event1.UUID, "C1", "D1");
+
+    queue.fireEvent(REPLICATION_COMPLETED, event1Completed);
+
+
+    Thread.sleep(2200l);
+
+    //until now: 3 in-progress activities are tracked with three
+    // UnderreplicatedEvents. The first one is completed, the remaining two
+    // are timed out (as the timeout -- defined in the leasmanager -- is 2000ms.
+
+    EventWatcherMetrics metrics = replicationWatcher.getMetrics();
+
+    //3 events are received
+    Assert.assertEquals(3, metrics.getTrackedEvents().value());
+
+    //one is finished. doesn't need to be resent
+    Assert.assertEquals(1, metrics.getCompletedEvents().value());
+
+    //Other two are timed out and resent
+    Assert.assertEquals(2, metrics.getTimedOutEvents().value());
+
+    DefaultMetricsSystem.shutdown();
   }
 
   private EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>
   createEventWatcher() {
-    return new EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>(
-        WATCH_UNDER_REPLICATED, REPLICATION_COMPLETED, leaseManager) {
-
-      @Override
-      void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
-        publisher.fireEvent(UNDER_REPLICATED, payload);
-      }
-
-      @Override
-      void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
-        //Good job. We did it.
-      }
-    };
+    return new CommandWatcherExample(WATCH_UNDER_REPLICATED,
+        REPLICATION_COMPLETED, leaseManager);
   }
 
+  private class CommandWatcherExample
+      extends EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent> {
+
+    public CommandWatcherExample(Event<UnderreplicatedEvent> startEvent,
+        Event<ReplicationCompletedEvent> completionEvent,
+        LeaseManager<UUID> leaseManager) {
+      super("TestCommandWatcher", startEvent, completionEvent, leaseManager);
+    }
+
+    @Override
+    void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
+      publisher.fireEvent(UNDER_REPLICATED, payload);
+    }
+
+    @Override
+    void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
+      //Good job. We did it.
+    }
+
+    @Override
+    public EventWatcherMetrics getMetrics() {
+      return super.getMetrics();
+    }
+  }
+
+  ;
+
   private static class ReplicationCompletedEvent
       implements IdentifiableEventPayload {
 
@@ -217,4 +298,4 @@
     }
   }
 
-}
\ No newline at end of file
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
index 387e399..5d4ab4a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java
@@ -36,19 +36,9 @@
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.SchedulingRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
-import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,87 +58,6 @@
   }
 
   /**
-   * Handle ApplicationNotRegistered exception and re-register.
-   *
-   * @param appId application Id
-   * @param rmProxy RM proxy instance
-   * @param registerRequest the AM re-register request
-   * @throws YarnException if re-register fails
-   */
-  public static void handleNotRegisteredExceptionAndReRegister(
-      ApplicationId appId, ApplicationMasterProtocol rmProxy,
-      RegisterApplicationMasterRequest registerRequest) throws YarnException {
-    LOG.info("App attempt {} not registered, most likely due to RM failover. "
-        + " Trying to re-register.", appId);
-    try {
-      rmProxy.registerApplicationMaster(registerRequest);
-    } catch (Exception e) {
-      if (e instanceof InvalidApplicationMasterRequestException
-          && e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) {
-        LOG.info("Concurrent thread successfully registered, moving on.");
-      } else {
-        LOG.error("Error trying to re-register AM", e);
-        throw new YarnException(e);
-      }
-    }
-  }
-
-  /**
-   * Helper method for client calling ApplicationMasterProtocol.allocate that
-   * handles re-register if RM fails over.
-   *
-   * @param request allocate request
-   * @param rmProxy RM proxy
-   * @param registerRequest the register request for re-register
-   * @param appId application id
-   * @return allocate response
-   * @throws YarnException if RM call fails
-   * @throws IOException if RM call fails
-   */
-  public static AllocateResponse allocateWithReRegister(AllocateRequest request,
-      ApplicationMasterProtocol rmProxy,
-      RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
-      throws YarnException, IOException {
-    try {
-      return rmProxy.allocate(request);
-    } catch (ApplicationMasterNotRegisteredException e) {
-      handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
-          registerRequest);
-      // reset responseId after re-register
-      request.setResponseId(0);
-      // retry allocate
-      return allocateWithReRegister(request, rmProxy, registerRequest, appId);
-    }
-  }
-
-  /**
-   * Helper method for client calling
-   * ApplicationMasterProtocol.finishApplicationMaster that handles re-register
-   * if RM fails over.
-   *
-   * @param request finishApplicationMaster request
-   * @param rmProxy RM proxy
-   * @param registerRequest the register request for re-register
-   * @param appId application id
-   * @return finishApplicationMaster response
-   * @throws YarnException if RM call fails
-   * @throws IOException if RM call fails
-   */
-  public static FinishApplicationMasterResponse finishAMWithReRegister(
-      FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy,
-      RegisterApplicationMasterRequest registerRequest, ApplicationId appId)
-      throws YarnException, IOException {
-    try {
-      return rmProxy.finishApplicationMaster(request);
-    } catch (ApplicationMasterNotRegisteredException ex) {
-      handleNotRegisteredExceptionAndReRegister(appId, rmProxy,
-          registerRequest);
-      // retry finishAM after re-register
-      return finishAMWithReRegister(request, rmProxy, registerRequest, appId);
-    }
-  }
-
-  /**
    * Create a proxy for the specified protocol.
    *
    * @param configuration Configuration to generate {@link ClientRMProxy}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
index e8a7f64..0d1a27e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java
@@ -147,6 +147,11 @@
     super.serviceStop();
   }
 
+  public void setAMRegistrationRequest(
+      RegisterApplicationMasterRequest registerRequest) {
+    this.amRegistrationRequest = registerRequest;
+  }
+
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(
       RegisterApplicationMasterRequest request)
@@ -259,8 +264,10 @@
           }
         }
 
-        // re register with RM, then retry allocate recursively
+        // re-register with RM, then retry allocate recursively
         registerApplicationMaster(this.amRegistrationRequest);
+        // Reset responseId after re-register
+        allocateRequest.setResponseId(0);
         return allocate(allocateRequest);
       }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
index 02eef29..5f9d81b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java
@@ -50,6 +50,7 @@
 import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
 import org.apache.hadoop.yarn.util.AsyncCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -385,4 +386,19 @@
     return this.unmanagedAppMasterMap.containsKey(uamId);
   }
 
+  /**
+   * Return the rmProxy relayer of an UAM.
+   *
+   * @param uamId uam Id
+   * @return the rmProxy relayer
+   * @throws YarnException if fails
+   */
+  public AMRMClientRelayer getAMRMClientRelayer(String uamId)
+      throws YarnException {
+    if (!this.unmanagedAppMasterMap.containsKey(uamId)) {
+      throw new YarnException("UAM " + uamId + " does not exist");
+    }
+    return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer();
+  }
+
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index 73795dc..856a818 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -63,6 +63,7 @@
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.apache.hadoop.yarn.util.AsyncCallback;
@@ -90,7 +91,7 @@
 
   private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
   private AMRequestHandlerThread handlerThread;
-  private ApplicationMasterProtocol rmProxy;
+  private AMRMClientRelayer rmProxyRelayer;
   private ApplicationId applicationId;
   private String submitter;
   private String appNameSuffix;
@@ -138,7 +139,7 @@
     this.appNameSuffix = appNameSuffix;
     this.handlerThread = new AMRequestHandlerThread();
     this.requestQueue = new LinkedBlockingQueue<>();
-    this.rmProxy = null;
+    this.rmProxyRelayer = null;
     this.connectionInitiated = false;
     this.registerRequest = null;
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
@@ -190,8 +191,9 @@
       throws IOException {
     this.userUgi = UserGroupInformation.createProxyUser(
         this.applicationId.toString(), UserGroupInformation.getCurrentUser());
-    this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf,
-        this.userUgi, amrmToken);
+    this.rmProxyRelayer =
+        new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class,
+            this.conf, this.userUgi, amrmToken));
   }
 
   /**
@@ -209,19 +211,18 @@
     // Save the register request for re-register later
     this.registerRequest = request;
 
-    // Since we have setKeepContainersAcrossApplicationAttempts = true for UAM.
-    // We do not expect application already registered exception here
     LOG.info("Registering the Unmanaged application master {}",
         this.applicationId);
     RegisterApplicationMasterResponse response =
-        this.rmProxy.registerApplicationMaster(this.registerRequest);
+        this.rmProxyRelayer.registerApplicationMaster(this.registerRequest);
+    this.lastResponseId = 0;
 
     for (Container container : response.getContainersFromPreviousAttempts()) {
-      LOG.info("RegisterUAM returned existing running container "
+      LOG.debug("RegisterUAM returned existing running container "
           + container.getId());
     }
     for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) {
-      LOG.info("RegisterUAM returned existing NM token for node "
+      LOG.debug("RegisterUAM returned existing NM token for node "
           + nmToken.getNodeId());
     }
 
@@ -249,7 +250,7 @@
 
     this.handlerThread.shutdown();
 
-    if (this.rmProxy == null) {
+    if (this.rmProxyRelayer == null) {
       if (this.connectionInitiated) {
         // This is possible if the async launchUAM is still
         // blocked and retrying. Return a dummy response in this case.
@@ -261,8 +262,7 @@
             + "be called before createAndRegister");
       }
     }
-    return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy,
-        this.registerRequest, this.applicationId);
+    return this.rmProxyRelayer.finishApplicationMaster(request);
   }
 
   /**
@@ -308,7 +308,7 @@
     //
     // In case 2, we have already save the allocate request above, so if the
     // registration succeed later, no request is lost.
-    if (this.rmProxy == null) {
+    if (this.rmProxyRelayer == null) {
       if (this.connectionInitiated) {
         LOG.info("Unmanaged AM still not successfully launched/registered yet."
             + " Saving the allocate request and send later.");
@@ -329,6 +329,15 @@
   }
 
   /**
+   * Returns the rmProxy relayer of this UAM.
+   *
+   * @return rmProxy relayer of the UAM
+   */
+  public AMRMClientRelayer getAMRMClientRelayer() {
+    return this.rmProxyRelayer;
+  }
+
+  /**
    * Returns RM proxy for the specified protocol type. Unit test cases can
    * override this method and return mock proxy instances.
    *
@@ -592,10 +601,7 @@
           }
 
           request.setResponseId(lastResponseId);
-
-          AllocateResponse response = AMRMClientUtils.allocateWithReRegister(
-              request, rmProxy, registerRequest, applicationId);
-
+          AllocateResponse response = rmProxyRelayer.allocate(request);
           if (response == null) {
             throw new YarnException("Null allocateResponse from allocate");
           }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index 23cd3e2..9b4d91d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -251,8 +251,6 @@
     ApplicationAttemptId attemptId = getAppIdentifier();
     LOG.info("Registering application attempt: " + attemptId);
 
-    shouldReRegisterNext = false;
-
     List<Container> containersFromPreviousAttempt = null;
 
     synchronized (applicationContainerIdMap) {
@@ -266,7 +264,7 @@
             containersFromPreviousAttempt.add(Container.newInstance(containerId,
                 null, null, null, null, null));
           }
-        } else {
+        } else if (!shouldReRegisterNext) {
           throw new InvalidApplicationMasterRequestException(
               AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
         }
@@ -276,6 +274,8 @@
       }
     }
 
+    shouldReRegisterNext = false;
+
     // Make sure we wait for certain test cases last in the method
     synchronized (syncObj) {
       syncObj.notifyAll();
@@ -339,13 +339,6 @@
 
     validateRunning();
 
-    if (request.getAskList() != null && request.getAskList().size() > 0
-        && request.getReleaseList() != null
-        && request.getReleaseList().size() > 0) {
-      Assert.fail("The mock RM implementation does not support receiving "
-          + "askList and releaseList in the same heartbeat");
-    }
-
     ApplicationAttemptId attemptId = getAppIdentifier();
     LOG.info("Allocate from application attempt: " + attemptId);
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index 5740749..645e47e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -62,14 +62,15 @@
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.AMRMClientRelayer;
 import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
@@ -106,9 +107,9 @@
   public static final String NMSS_REG_RESPONSE_KEY =
       NMSS_CLASS_PREFIX + "registerResponse";
 
-  /*
+  /**
    * When AMRMProxy HA is enabled, secondary AMRMTokens will be stored in Yarn
-   * Registry. Otherwise if NM recovery is enabled, the UAM token are store in
+   * Registry. Otherwise if NM recovery is enabled, the UAM token are stored in
    * local NMSS instead under this directory name.
    */
   public static final String NMSS_SECONDARY_SC_PREFIX =
@@ -119,8 +120,23 @@
    * The home sub-cluster is the sub-cluster where the AM container is running
    * in.
    */
-  private ApplicationMasterProtocol homeRM;
+  private AMRMClientRelayer homeRMRelayer;
   private SubClusterId homeSubClusterId;
+  private volatile int lastHomeResponseId;
+
+  /**
+   * A flag for work preserving NM restart. If we just recovered, we need to
+   * generate an {@link ApplicationMasterNotRegisteredException} exception back
+   * to AM (similar to what RM will do after its restart/fail-over) in its next
+   * allocate to trigger AM re-register (which we will shield from RM and just
+   * return our saved register response) and a full pending requests re-send, so
+   * that all the {@link AMRMClientRelayer} will be re-populated with all
+   * pending requests.
+   *
+   * TODO: When split-merge is not idempotent, this can lead to some
+   * over-allocation without a full cancel to RM.
+   */
+  private volatile boolean justRecovered;
 
   /**
    * UAM pool for secondary sub-clusters (ones other than home sub-cluster),
@@ -134,6 +150,12 @@
    */
   private UnmanagedAMPoolManager uamPool;
 
+  /**
+   * The rmProxy relayers for secondary sub-clusters that keep track of all
+   * pending requests.
+   */
+  private Map<String, AMRMClientRelayer> secondaryRelayers;
+
   /** Thread pool used for asynchronous operations. */
   private ExecutorService threadpool;
 
@@ -186,8 +208,11 @@
     this.asyncResponseSink = new ConcurrentHashMap<>();
     this.threadpool = Executors.newCachedThreadPool();
     this.uamPool = createUnmanagedAMPoolManager(this.threadpool);
+    this.secondaryRelayers = new ConcurrentHashMap<>();
     this.amRegistrationRequest = null;
     this.amRegistrationResponse = null;
+    this.lastHomeResponseId = Integer.MAX_VALUE;
+    this.justRecovered = false;
   }
 
   /**
@@ -224,8 +249,8 @@
 
     this.homeSubClusterId =
         SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
-    this.homeRM = createHomeRMProxy(appContext, ApplicationMasterProtocol.class,
-        this.appOwner);
+    this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
+        ApplicationMasterProtocol.class, this.appOwner));
 
     this.federationFacade = FederationStateStoreFacade.getInstance();
     this.subClusterResolver = this.federationFacade.getSubClusterResolver();
@@ -240,13 +265,12 @@
   @Override
   public void recover(Map<String, byte[]> recoveredDataMap) {
     super.recover(recoveredDataMap);
-    LOG.info("Recovering data for FederationInterceptor");
+    ApplicationAttemptId attemptId =
+        getApplicationContext().getApplicationAttemptId();
+    LOG.info("Recovering data for FederationInterceptor for {}", attemptId);
     if (recoveredDataMap == null) {
       return;
     }
-
-    ApplicationAttemptId attemptId =
-        getApplicationContext().getApplicationAttemptId();
     try {
       if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) {
         RegisterApplicationMasterRequestProto pb =
@@ -255,6 +279,9 @@
         this.amRegistrationRequest =
             new RegisterApplicationMasterRequestPBImpl(pb);
         LOG.info("amRegistrationRequest recovered for {}", attemptId);
+
+        // Give the register request to homeRMRelayer for future re-registration
+        this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest);
       }
       if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) {
         RegisterApplicationMasterResponseProto pb =
@@ -263,6 +290,9 @@
         this.amRegistrationResponse =
             new RegisterApplicationMasterResponsePBImpl(pb);
         LOG.info("amRegistrationResponse recovered for {}", attemptId);
+        // Trigger re-register and full pending re-send only if we have a
+        // saved register response. This should always be true though.
+        this.justRecovered = true;
       }
 
       // Recover UAM amrmTokens from registry or NMSS
@@ -309,6 +339,9 @@
               getApplicationContext().getUser(), this.homeSubClusterId.getId(),
               entry.getValue());
 
+          this.secondaryRelayers.put(subClusterId.getId(),
+              this.uamPool.getAMRMClientRelayer(subClusterId.getId()));
+
           RegisterApplicationMasterResponse response =
               this.uamPool.registerApplicationMaster(subClusterId.getId(),
                   this.amRegistrationRequest);
@@ -436,7 +469,7 @@
      * the other sub-cluster RM will be done lazily as needed later.
      */
     this.amRegistrationResponse =
-        this.homeRM.registerApplicationMaster(request);
+        this.homeRMRelayer.registerApplicationMaster(request);
     if (this.amRegistrationResponse
         .getContainersFromPreviousAttempts() != null) {
       cacheAllocatedContainers(
@@ -495,6 +528,34 @@
     Preconditions.checkArgument(this.policyInterpreter != null,
         "Allocate should be called after registerApplicationMaster");
 
+    if (this.justRecovered && this.lastHomeResponseId == Integer.MAX_VALUE) {
+      // Save the responseId home RM is expecting
+      this.lastHomeResponseId = request.getResponseId();
+
+      throw new ApplicationMasterNotRegisteredException(
+          "AMRMProxy just restarted and recovered for "
+              + getApplicationContext().getApplicationAttemptId()
+              + ". AM should re-register and full re-send pending requests.");
+    }
+
+    // Override responseId in the request in two cases:
+    //
+    // 1. After we just recovered after an NM restart and AM's responseId is
+    // reset due to the exception we generate. We need to override the
+    // responseId to the one homeRM expects.
+    //
+    // 2. After homeRM fail-over, the allocate response with reseted responseId
+    // might not be returned successfully back to AM because of RPC connection
+    // timeout between AM and AMRMProxy. In this case, we remember and reset the
+    // responseId for AM.
+    if (this.justRecovered
+        || request.getResponseId() > this.lastHomeResponseId) {
+      LOG.warn("Setting allocate responseId for {} from {} to {}",
+          getApplicationContext().getApplicationAttemptId(),
+          request.getResponseId(), this.lastHomeResponseId);
+      request.setResponseId(this.lastHomeResponseId);
+    }
+
     try {
       // Split the heart beat request into multiple requests, one for each
       // sub-cluster RM that is used by this application.
@@ -509,10 +570,18 @@
           sendRequestsToSecondaryResourceManagers(requests);
 
       // Send the request to the home RM and get the response
-      AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister(
-          requests.get(this.homeSubClusterId), this.homeRM,
-          this.amRegistrationRequest,
-          getApplicationContext().getApplicationAttemptId().getApplicationId());
+      AllocateRequest homeRequest = requests.get(this.homeSubClusterId);
+      LOG.info("{} heartbeating to home RM with responseId {}",
+          getApplicationContext().getApplicationAttemptId(),
+          homeRequest.getResponseId());
+
+      AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest);
+
+      // Reset the flag after the first successful homeRM allocate response,
+      // otherwise keep overriding the responseId of new allocate request
+      if (this.justRecovered) {
+        this.justRecovered = false;
+      }
 
       // Notify policy of home response
       try {
@@ -540,6 +609,22 @@
             newRegistrations.getSuccessfulRegistrations());
       }
 
+      LOG.info("{} heartbeat response from home RM with responseId {}",
+          getApplicationContext().getApplicationAttemptId(),
+          homeResponse.getResponseId());
+
+      // Update lastHomeResponseId in three cases:
+      // 1. The normal responseId increments
+      // 2. homeResponse.getResponseId() == 1. This happens when homeRM fails
+      // over, AMRMClientRelayer auto re-register and full re-send for homeRM.
+      // 3. lastHomeResponseId == MAX_INT. This is the initial case or
+      // responseId about to overflow and wrap around
+      if (homeResponse.getResponseId() == this.lastHomeResponseId + 1
+          || homeResponse.getResponseId() == 1
+          || this.lastHomeResponseId == Integer.MAX_VALUE) {
+        this.lastHomeResponseId = homeResponse.getResponseId();
+      }
+
       // return the final response to the application master.
       return homeResponse;
     } catch (IOException ex) {
@@ -584,6 +669,16 @@
             try {
               uamResponse =
                   uamPool.finishApplicationMaster(subClusterId, finishRequest);
+
+              if (uamResponse.getIsUnregistered()) {
+                secondaryRelayers.remove(subClusterId);
+
+                if (getNMStateStore() != null) {
+                  getNMStateStore().removeAMRMProxyAppContextEntry(
+                      getApplicationContext().getApplicationAttemptId(),
+                      NMSS_SECONDARY_SC_PREFIX + subClusterId);
+                }
+              }
             } catch (Throwable e) {
               LOG.warn("Failed to finish unmanaged application master: "
                   + "RM address: " + subClusterId + " ApplicationId: "
@@ -600,9 +695,7 @@
     // asynchronously by other sub-cluster resource managers, send the same
     // request to the home resource manager on this thread.
     FinishApplicationMasterResponse homeResponse =
-        AMRMClientUtils.finishAMWithReRegister(request, this.homeRM,
-            this.amRegistrationRequest, getApplicationContext()
-                .getApplicationAttemptId().getApplicationId());
+        this.homeRMRelayer.finishApplicationMaster(request);
 
     if (subClusterIds.size() > 0) {
       // Wait for other sub-cluster resource managers to return the
@@ -621,10 +714,6 @@
           if (uamResponse.getResponse() == null
               || !uamResponse.getResponse().getIsUnregistered()) {
             failedToUnRegister = true;
-          } else if (getNMStateStore() != null) {
-            getNMStateStore().removeAMRMProxyAppContextEntry(
-                getApplicationContext().getApplicationAttemptId(),
-                NMSS_SECONDARY_SC_PREFIX + uamResponse.getSubClusterId());
           }
         } catch (Throwable e) {
           failedToUnRegister = true;
@@ -689,6 +778,11 @@
     return this.registryClient;
   }
 
+  @VisibleForTesting
+  protected int getLastHomeResponseId() {
+    return this.lastHomeResponseId;
+  }
+
   /**
    * Create the UAM pool manager for secondary sub-clsuters. For unit test to
    * override.
@@ -800,6 +894,9 @@
                     getApplicationContext().getUser(), homeSubClusterId.getId(),
                     amrmToken);
 
+                secondaryRelayers.put(subClusterId.getId(),
+                    uamPool.getAMRMClientRelayer(subClusterId.getId()));
+
                 response = uamPool.registerApplicationMaster(
                     subClusterId.getId(), amRegistrationRequest);
 
@@ -1098,7 +1195,10 @@
                   token = uamPool.launchUAM(subClusterId, config,
                       appContext.getApplicationAttemptId().getApplicationId(),
                       amRegistrationResponse.getQueue(), appContext.getUser(),
-                      homeSubClusterId.toString(), registryClient != null);
+                      homeSubClusterId.toString(), true);
+
+                  secondaryRelayers.put(subClusterId,
+                      uamPool.getAMRMClientRelayer(subClusterId));
 
                   uamResponse = uamPool.registerApplicationMaster(subClusterId,
                       registerRequest);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 677732d..2794857 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -32,6 +32,7 @@
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -536,6 +537,7 @@
     capability.setMemorySize(memory);
     capability.setVirtualCores(vCores);
     req.setCapability(capability);
+    req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance());
     if (labelExpression != null) {
       req.setNodeLabelExpression(labelExpression);
     }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index eefaba1..a837eed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -52,6 +52,7 @@
 import org.apache.hadoop.yarn.api.records.UpdateContainerError;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
@@ -516,6 +517,22 @@
         interceptor.recover(recoveredDataMap);
 
         Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
+        Assert.assertEquals(Integer.MAX_VALUE,
+            interceptor.getLastHomeResponseId());
+
+        // The first allocate call expects a fail-over exception and re-register
+        int responseId = 10;
+        AllocateRequest allocateRequest =
+            Records.newRecord(AllocateRequest.class);
+        allocateRequest.setResponseId(responseId);
+        try {
+          interceptor.allocate(allocateRequest);
+          Assert.fail("Expecting an ApplicationMasterNotRegisteredException  "
+              + " after FederationInterceptor restarts and recovers");
+        } catch (ApplicationMasterNotRegisteredException e) {
+        }
+        interceptor.registerApplicationMaster(registerReq);
+        Assert.assertEquals(responseId, interceptor.getLastHomeResponseId());
 
         // Release all containers
         releaseContainersAndAssert(containers);