(TWILL-61) Fix to allow higher attempts to relaunch the app after the first attempt failed

- Delete the Kafka root zk node for the application if already exist
- Delete the AM instance zk node if already exist
- For runnables parent zk node, it is not an error if it already exist
- Enhance KafkaClient publisher / consumer to deal with Kafka cluster changes
  - When AM killed and restarted, the embedded Kafka will be running in different host and port

This closes #67 on Github.

Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/pom.xml b/pom.xml
index 84c4b8c..45aa64d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -180,6 +180,7 @@
         <jopt-simple.version>3.2</jopt-simple.version>
         <commons-compress.version>1.5</commons-compress.version>
         <hadoop20.output.dir>target/hadoop20-classes</hadoop20.output.dir>
+        <force.mac.tests>false</force.mac.tests>
     </properties>
 
     <scm>
@@ -346,6 +347,7 @@
                     <redirectTestOutputToFile>${surefire.redirectTestOutputToFile}</redirectTestOutputToFile>
                     <systemPropertyVariables>
                         <java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
+                        <force.mac.tests>${force.mac.tests}</force.mac.tests>
                     </systemPropertyVariables>
                     <reuseForks>false</reuseForks>
                     <reportFormat>plain</reportFormat>
@@ -362,6 +364,18 @@
 
     <profiles>
         <profile>
+            <!--
+                This profile is to force certain tests to run on Mac.
+                Those tests are disabled due to orphan processes left after the test run (HADOOP-12317).
+                If this profile is enabled, after the test finished, run the `jps` command
+                and delete all `TwillLauncher` processes
+            -->
+            <id>force-mac-tests</id>
+            <properties>
+                <force.mac.tests>true</force.mac.tests>
+            </properties>
+        </profile>
+        <profile>
             <id>apache-release</id>
             <build>
                 <plugins>
diff --git a/twill-api/src/main/java/org/apache/twill/api/Configs.java b/twill-api/src/main/java/org/apache/twill/api/Configs.java
index 9a21489..20a25f6 100644
--- a/twill-api/src/main/java/org/apache/twill/api/Configs.java
+++ b/twill-api/src/main/java/org/apache/twill/api/Configs.java
@@ -79,6 +79,18 @@
     public static final String YARN_AM_RESERVED_MEMORY_MB = "twill.yarn.am.reserved.memory.mb";
 
     /**
+     * Maximum number of attempts to run the application by YARN if there is failure.
+     */
+    public static final String YARN_MAX_APP_ATTEMPTS = "twill.yarn.max.app.attempts";
+
+    /**
+     * Interval time in milliseconds for the attempt failures validity interval in YARN. YARN only limit to
+     * the maximum attempt count for failures in the given interval.
+     */
+    public static final String YARN_ATTEMPT_FAILURES_VALIDITY_INTERVAL =
+      "twill.yarn.attempt.failures.validity.interval";
+
+    /**
      * Setting for enabling log collection.
      */
     public static final String LOG_COLLECTION_ENABLED = "twill.log.collection.enabled";
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index fd8a939..0ff2fc8 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -222,6 +222,24 @@
     return sendMessage(SystemMessages.resetLogLevels(runnableName, Sets.newHashSet(loggerNames)), loggerNames);
   }
 
+  /**
+   * Reset the log handler to poll from the beginning of Kafka.
+   */
+  protected final synchronized void resetLogHandler() {
+    if (kafkaClient == null) {
+      return;
+    }
+    if (logCancellable != null) {
+      logCancellable.cancel();
+      logCancellable = null;
+    }
+    if (!logHandlers.isEmpty()) {
+      logCancellable = kafkaClient.getConsumer().prepare()
+        .addFromBeginning(Constants.LOG_TOPIC, 0)
+        .consume(new LogMessageCallback(logHandlers));
+    }
+  }
+
   private void validateInstanceIds(String runnable, Set<Integer> instanceIds) {
     ResourceReport resourceReport = getResourceReport();
     if (resourceReport == null) {
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index 8e73653..425cd43 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -206,7 +206,7 @@
   }
 
   /**
-   * Update the live node for the runnable.
+   * Update the live node for the service.
    *
    * @return A {@link OperationFuture} that will be completed when the update is done.
    */
@@ -216,11 +216,15 @@
     return zkClient.setData(liveNodePath, serializeLiveNode());
   }
 
+  /**
+   * Creates the live node for the service. If the node already exists, it will be deleted before creation.
+   *
+   * @return A {@link OperationFuture} that will be completed when the creation is done.
+   */
   private OperationFuture<String> createLiveNode() {
-    String liveNodePath = getLiveNodePath();
-    LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath);
-    return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
-                                    KeeperException.NodeExistsException.class, liveNodePath);
+    final String liveNodePath = getLiveNodePath();
+    LOG.info("Creating live node {}{}", zkClient.getConnectString(), liveNodePath);
+    return ZKOperations.createDeleteIfExists(zkClient, liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL, true);
   }
 
   private OperationFuture<String> removeLiveNode() {
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
index 73235b7..f69350e 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
@@ -340,17 +340,17 @@
           continue;
         }
 
-        // If offset < 0, meaning it's special offset value that needs to fetch either the earliest or latest offset
-        // from kafak server.
-        long off = offset.get();
-        if (off < 0) {
-          offset.set(getLastOffset(topicPart, off));
-        }
-
-        SimpleConsumer consumer = consumerEntry.getValue();
-
-        // Fire a fetch message request
         try {
+          // If offset < 0, meaning it's special offset value that needs to fetch either the earliest or latest offset
+          // from kafak server.
+          long off = offset.get();
+          if (off < 0) {
+            offset.set(getLastOffset(topicPart, off));
+          }
+
+          SimpleConsumer consumer = consumerEntry.getValue();
+
+          // Fire a fetch message request
           FetchResponse response = fetchMessages(consumer, offset.get());
 
           // Failure response, set consumer entry to null and let next round of loop to handle it.
@@ -364,6 +364,7 @@
 
             consumers.refresh(consumerEntry.getKey());
             consumerEntry = null;
+            backoff.backoff();
             continue;
           }
 
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
index f147d24..e5d0f8d 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
@@ -54,11 +54,11 @@
   private final AtomicReference<Producer<Integer, ByteBuffer>> producer;
   private final AtomicBoolean listenerCancelled;
 
-  public SimpleKafkaPublisher(BrokerService brokerService, Ack ack, Compression compression) {
+  SimpleKafkaPublisher(BrokerService brokerService, Ack ack, Compression compression) {
     this.brokerService = brokerService;
     this.ack = ack;
     this.compression = compression;
-    this.producer = new AtomicReference<Producer<Integer, ByteBuffer>>();
+    this.producer = new AtomicReference<>();
     this.listenerCancelled = new AtomicBoolean(false);
   }
 
@@ -107,7 +107,7 @@
 
     @Override
     public Preparer add(ByteBuffer message, Object partitionKey) {
-      messages.add(new KeyedMessage<Integer, ByteBuffer>(topic, Math.abs(partitionKey.hashCode()), message));
+      messages.add(new KeyedMessage<>(topic, Math.abs(partitionKey.hashCode()), message));
       return this;
     }
 
@@ -159,30 +159,37 @@
       }
 
       String newBrokerList = brokerService.getBrokerList();
-      if (newBrokerList.isEmpty()) {
-        LOG.warn("Broker list is empty. No Kafka producer is created.");
-        return;
-      }
 
+      // If there is no change, whether it is empty or not, just return
       if (Objects.equal(brokerList, newBrokerList)) {
         return;
       }
 
-      Properties props = new Properties();
-      props.put("metadata.broker.list", newBrokerList);
-      props.put("serializer.class", ByteBufferEncoder.class.getName());
-      props.put("key.serializer.class", IntegerEncoder.class.getName());
-      props.put("partitioner.class", IntegerPartitioner.class.getName());
-      props.put("request.required.acks", Integer.toString(ack.getAck()));
-      props.put("compression.codec", compression.getCodec());
+      Producer<Integer, ByteBuffer> newProducer = null;
+      if (!newBrokerList.isEmpty()) {
+        Properties props = new Properties();
+        props.put("metadata.broker.list", newBrokerList);
+        props.put("serializer.class", ByteBufferEncoder.class.getName());
+        props.put("key.serializer.class", IntegerEncoder.class.getName());
+        props.put("partitioner.class", IntegerPartitioner.class.getName());
+        props.put("request.required.acks", Integer.toString(ack.getAck()));
+        props.put("compression.codec", compression.getCodec());
 
-      ProducerConfig config = new ProducerConfig(props);
-      Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(new Producer<Integer, ByteBuffer>(config));
+        ProducerConfig config = new ProducerConfig(props);
+        newProducer = new Producer<>(config);
+      }
+
+      // If the broker list is empty, the producer will be set to null
+      Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(newProducer);
       if (oldProducer != null) {
         oldProducer.close();
       }
 
-      LOG.info("Update Kafka producer broker list: {}", newBrokerList);
+      if (newBrokerList.isEmpty()) {
+        LOG.warn("Empty Kafka producer broker list, publish will fail.");
+      } else {
+        LOG.info("Updated Kafka producer broker list: {}", newBrokerList);
+      }
       brokerList = newBrokerList;
     }
   }
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
index 2ffc604..de42b9b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
@@ -51,6 +51,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
@@ -58,6 +60,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
 
 /**
  * A {@link BrokerService} that watches kafka zk nodes for updates of broker lists and leader for
@@ -136,11 +139,13 @@
       return brokerList.get();
     }
 
-    final SettableFuture<?> readerFuture = SettableFuture.create();
-    final AtomicReference<Iterable<BrokerInfo>> brokers =
-      new AtomicReference<Iterable<BrokerInfo>>(ImmutableList.<BrokerInfo>of());
+    final SettableFuture<?> readyFuture = SettableFuture.create();
+    final AtomicReference<List<BrokerInfo>> brokers = new AtomicReference<>(Collections.<BrokerInfo>emptyList());
 
     actOnExists(BROKER_IDS_PATH, new Runnable() {
+
+      final Runnable thisRunnable = this;
+
       @Override
       public void run() {
         // Callback for fetching children list. This callback should be executed in the executorService.
@@ -154,19 +159,19 @@
                   Iterables.transform(
                     brokerInfos.getAll(Iterables.transform(result.getChildren(), BROKER_ID_TRANSFORMER)).values(),
                     Suppliers.<BrokerInfo>supplierFunction())));
-              readerFuture.set(null);
+              readyFuture.set(null);
 
               for (ListenerExecutor listener : listeners) {
                 listener.changed(ZKBrokerService.this);
               }
             } catch (ExecutionException e) {
-              readerFuture.setException(e.getCause());
+              readyFuture.setException(e.getCause());
             }
           }
 
           @Override
           public void onFailure(Throwable t) {
-            readerFuture.setException(t);
+            readyFuture.setException(t);
           }
         };
 
@@ -179,15 +184,25 @@
             }
             if (event.getType() == Event.EventType.NodeChildrenChanged) {
               Futures.addCallback(zkClient.getChildren(BROKER_IDS_PATH, this), childrenCallback, executorService);
+            } else if (event.getType() == Event.EventType.NodeDeleted) {
+              // If the ids node is deleted, clear the broker list and re-watch.
+              // This could happen when the Kafka server is restarted and have the ZK node cleanup
+              // The readyFuture for this call doesn't matter, as we don't need to block on anything
+              brokers.set(Collections.<BrokerInfo>emptyList());
+              for (ListenerExecutor listener : listeners) {
+                listener.changed(ZKBrokerService.this);
+              }
+              actOnExists(BROKER_IDS_PATH, thisRunnable, SettableFuture.create(),
+                          FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
             }
           }
         }), childrenCallback, executorService);
       }
-    }, readerFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
+    }, readyFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
 
-    brokerList = createSupplier(brokers);
+    brokerList = this.<Iterable<BrokerInfo>>createSupplier(brokers);
     try {
-      readerFuture.get();
+      readyFuture.get();
     } catch (Exception e) {
       throw Throwables.propagate(e);
     }
@@ -223,7 +238,7 @@
       public Supplier<T> load(final K key) throws Exception {
         // A future to tell if the result is ready, even it is failure.
         final SettableFuture<T> readyFuture = SettableFuture.create();
-        final AtomicReference<T> resultValue = new AtomicReference<T>();
+        final AtomicReference<T> resultValue = new AtomicReference<>();
 
         // Fetch for node data when it exists.
         final String path = key.getPath();
@@ -312,7 +327,7 @@
       }
     }), new FutureCallback<Stat>() {
       @Override
-      public void onSuccess(Stat result) {
+      public void onSuccess(@Nullable Stat result) {
         if (result != null) {
           action.run();
         } else {
@@ -345,7 +360,7 @@
   /**
    * Creates a supplier that always return latest copy from an {@link java.util.concurrent.atomic.AtomicReference}.
    */
-  private <T> Supplier<T> createSupplier(final AtomicReference<T> ref) {
+  private <T> Supplier<T> createSupplier(final AtomicReference<? extends T> ref) {
     return new Supplier<T>() {
       @Override
       public T get() {
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
index aa14a75..c219171 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
@@ -59,7 +59,7 @@
 public class Hadoop21YarnAppClient implements YarnAppClient {
 
   private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAppClient.class);
-  private final Configuration configuration;
+  protected final Configuration configuration;
 
   public Hadoop21YarnAppClient(Configuration configuration) {
     this.configuration = configuration;
@@ -108,7 +108,7 @@
             addRMToken(launchContext, yarnClient, appId);
             appSubmissionContext.setAMContainerSpec(launchContext);
             appSubmissionContext.setResource(capability);
-            appSubmissionContext.setMaxAppAttempts(2);
+            configureAppSubmissionContext(appSubmissionContext);
 
             yarnClient.submitApplication(appSubmissionContext);
             return new ProcessControllerImpl(appId);
@@ -126,6 +126,19 @@
     }
   }
 
+  /**
+   * Updates the {@link ApplicationSubmissionContext} based on configuration.
+   */
+  protected void configureAppSubmissionContext(ApplicationSubmissionContext context) {
+    int maxAttempts = configuration.getInt(Configs.Keys.YARN_MAX_APP_ATTEMPTS, -1);
+    if (maxAttempts > 0) {
+      context.setMaxAppAttempts(maxAttempts);
+    } else {
+      // Preserve the old behavior
+      context.setMaxAppAttempts(2);
+    }
+  }
+
   private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
     int maxMemory = response.getMaximumResourceCapability().getMemory();
     int updatedMemory = capability.getMemory();
diff --git a/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java b/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java
index 97d2a64..0e3382f 100644
--- a/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java
@@ -48,14 +48,12 @@
  * </p>
  */
 @SuppressWarnings("unused")
-public final class Hadoop23YarnAppClient extends Hadoop21YarnAppClient {
+public class Hadoop23YarnAppClient extends Hadoop21YarnAppClient {
 
   private static final Logger LOG = LoggerFactory.getLogger(Hadoop23YarnAppClient.class);
-  private final Configuration configuration;
 
   public Hadoop23YarnAppClient(Configuration configuration) {
     super(configuration);
-    this.configuration = configuration;
   }
 
   /**
diff --git a/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java b/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java
new file mode 100644
index 0000000..1c27518
--- /dev/null
+++ b/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java
@@ -0,0 +1,48 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.twill.api.Configs;
+
+/**
+ * <p>
+ * The service implementation of {@link YarnAppClient} for Apache Hadoop 2.6 and beyond.
+ *
+ * The {@link VersionDetectYarnAppClientFactory} class will decide to return instance of this class for
+ * Apache Hadoop 2.6 and beyond.
+ * </p>
+ */
+@SuppressWarnings("unused")
+public class Hadoop26YarnAppClient extends Hadoop23YarnAppClient {
+
+  public Hadoop26YarnAppClient(Configuration configuration) {
+    super(configuration);
+  }
+
+  @Override
+  protected void configureAppSubmissionContext(ApplicationSubmissionContext context) {
+    super.configureAppSubmissionContext(context);
+    long interval = configuration.getLong(Configs.Keys.YARN_ATTEMPT_FAILURES_VALIDITY_INTERVAL, -1L);
+    if (interval > 0) {
+      context.setAttemptFailuresValidityInterval(interval);
+    }
+  }
+}
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 445656d..7706d52 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -165,9 +165,10 @@
 
     @Override
     protected void startUp() throws Exception {
-      ZKOperations.ignoreError(
-        zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT),
-        KeeperException.NodeExistsException.class, kafkaZKPath).get();
+      // Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is
+      // no left over content from previous AM attempt.
+      LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath);
+      ZKOperations.createDeleteIfExists(zkClient, kafkaZKPath, null, CreateMode.PERSISTENT, true).get();
       kafkaServer.startAndWait();
     }
 
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 6fc31f5..8a80041 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -78,7 +78,9 @@
 import org.apache.twill.internal.yarn.YarnUtils;
 import org.apache.twill.zookeeper.ZKClient;
 import org.apache.twill.zookeeper.ZKClients;
+import org.apache.twill.zookeeper.ZKOperations;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -318,8 +320,12 @@
 
     instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
 
-    // Creates ZK path for runnable
-    zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT).get();
+    // Creates ZK path for runnable. It's ok if the path already exists.
+    // That's for the case when the AM get killed and restarted
+    ZKOperations.ignoreError(
+      zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT),
+      KeeperException.NodeExistsException.class, null)
+      .get();
     runningContainers.addWatcher(Constants.DISCOVERY_PATH_PREFIX);
     runnableContainerRequests = initContainerRequests();
   }
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
index c8e88c9..83de2a4 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
@@ -39,9 +39,13 @@
           // 2.1 and 2.2 uses the same YarnAppClient
           clzName = getClass().getPackage().getName() + ".Hadoop21YarnAppClient";
           break;
-        default:
+        case HADOOP_23:
           // 2.3 and above uses the 2.3 YarnAppClient to support RM HA
           clzName = getClass().getPackage().getName() + ".Hadoop23YarnAppClient";
+          break;
+        default:
+          // Anything above 2.3 will be 2.6 and beyond
+          clzName = getClass().getPackage().getName() + ".Hadoop26YarnAppClient";
       }
       Class<YarnAppClient> clz = (Class<YarnAppClient>) Class.forName(clzName);
       return clz.getConstructor(Configuration.class).newInstance(configuration);
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
index 335d7ec..8f844a2 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
@@ -21,6 +21,7 @@
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -68,6 +69,7 @@
   private final TimeUnit startTimeoutUnit;
   private volatile ApplicationMasterLiveNodeData amLiveNodeData;
   private ProcessController<YarnApplicationReport> processController;
+  private ApplicationAttemptId currentAttemptId;
 
   // Thread for polling yarn for application status if application got ZK session expire.
   // Only used by the instanceUpdate/Delete method, which is from serialized call from ZK callback.
@@ -141,6 +143,8 @@
         LOG.info("Yarn application {} {} is not in running state. Shutting down controller.", appName, appId);
         forceShutDown();
       }
+
+      currentAttemptId = report.getCurrentApplicationAttemptId();
     } catch (Exception e) {
       throw Throwables.propagate(e);
     }
@@ -273,6 +277,13 @@
               shutdown = true;
               break;
             }
+            ApplicationAttemptId attemptId = report.getCurrentApplicationAttemptId();
+            if (currentAttemptId.compareTo(attemptId) != 0) {
+              LOG.info("Application attempt ID change from {} to {}", currentAttemptId, attemptId);
+              currentAttemptId = attemptId;
+              resetLogHandler();
+            }
+
             // Make a sync exists call to instance node and re-watch if the node exists
             try {
               // The timeout is arbitrary, as it's just for avoiding block forever
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java
new file mode 100644
index 0000000..4f5adce
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java
@@ -0,0 +1,189 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.twill.yarn;
+
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.EventHandler;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.LogEntry;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.internal.yarn.YarnUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Unit test for application master resilience.
+ */
+public class AppRecoveryTestRun extends BaseYarnTest {
+
+  @ClassRule
+  public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+  @Test
+  public void testAMRestart() throws Exception {
+    // Only run it with Hadoop-2.1 or above
+    Assume.assumeTrue(YarnUtils.getHadoopVersion().compareTo(YarnUtils.HadoopVersions.HADOOP_21) >= 0);
+    // Don't run this test in Mac, as there would be leftover java process (HADOOP-12317)
+    // The test can be force to run by turning on the "force-mac-tests" maven profile
+    // After the test finished, run the `jps` command and delete all `TwillLauncher` processes
+    Assume.assumeTrue(Boolean.parseBoolean(System.getProperty("force.mac.tests")) ||
+                      !System.getProperty("os.name").toLowerCase().contains("mac"));
+
+    File watchFile = TEMP_FOLDER.newFile();
+    watchFile.delete();
+
+    // Start the testing app, and wait for 4 log lines that match the pattern emitted by the event handler (AM)
+    // and from the runnable
+    final Semaphore semaphore = new Semaphore(0);
+    TwillRunner runner = getTwillRunner();
+    TwillController controller = runner.prepare(new TestApp(new TestEventHandler(watchFile)))
+      .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+      // Use a log handler to match messages from AM and the Runnable to make sure the log collection get resumed
+      // correctly after the AM restarted
+      .addLogHandler(new LogHandler() {
+        @Override
+        public void onLog(LogEntry logEntry) {
+          String message = logEntry.getMessage();
+          if (message.equals("Container for " + TestRunnable.class.getSimpleName() + " launched")) {
+            semaphore.release();
+          } else if (message.equals("Running 0")) {
+            semaphore.release();
+          }
+        }
+      })
+      .start();
+
+    // Wait for the first attempt running
+    Assert.assertTrue(semaphore.tryAcquire(2, 2, TimeUnit.MINUTES));
+    // Touch the watchFile so that the event handler will kill the AM
+    Files.touch(watchFile);
+    // Wait for the second attempt running
+    Assert.assertTrue(semaphore.tryAcquire(2, 2, TimeUnit.MINUTES));
+
+    controller.terminate().get();
+  }
+
+  /**
+   * A {@link EventHandler} for killing the first attempt of the application.
+   */
+  public static final class TestEventHandler extends EventHandler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestEventHandler.class);
+
+    private File watchFile;
+
+    TestEventHandler(File watchFile) {
+      this.watchFile = watchFile;
+    }
+
+    @Override
+    public void containerLaunched(String runnableName, int instanceId, String containerId) {
+      LOG.info("Container for {} launched", runnableName);
+
+      if (containerId.contains("_01_")) {
+        final File watchFile = new File(context.getSpecification().getConfigs().get("watchFile"));
+        Thread t = new Thread() {
+          @Override
+          public void run() {
+            // Wait for the watch file to be available, then kill the process
+            while (!watchFile.exists()) {
+              Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+            }
+            Runtime.getRuntime().halt(-1);
+          }
+        };
+        t.setDaemon(true);
+        t.start();
+      }
+    }
+
+    @Override
+    protected Map<String, String> getConfigs() {
+      return Collections.singletonMap("watchFile", watchFile.getAbsolutePath());
+    }
+  }
+
+  /**
+   * Application for testing
+   */
+  public static final class TestApp implements TwillApplication {
+
+    private final EventHandler eventHandler;
+
+    public TestApp(EventHandler eventHandler) {
+      this.eventHandler = eventHandler;
+    }
+
+    @Override
+    public TwillSpecification configure() {
+      return TwillSpecification.Builder.with()
+        .setName("TestApp")
+        .withRunnable()
+        .add(new TestRunnable()).noLocalFiles()
+        .anyOrder()
+        .withEventHandler(eventHandler).build();
+    }
+  }
+
+  /**
+   * Runnable for testing
+   */
+  public static final class TestRunnable extends AbstractTwillRunnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestRunnable.class);
+
+    private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+    @Override
+    public void run() {
+      long count = 0;
+      try {
+        while (!stopLatch.await(2, TimeUnit.SECONDS)) {
+          LOG.info("Running {}", count++);
+        }
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted", e);
+      }
+    }
+
+    @Override
+    public void stop() {
+      stopLatch.countDown();
+    }
+  }
+}
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
index 0e2239d..bce6391 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
@@ -25,16 +25,21 @@
 import org.apache.twill.common.Cancellable;
 import org.apache.twill.common.Threads;
 import org.apache.twill.internal.zookeeper.SettableOperationFuture;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
 
 /**
  * Collection of helper methods for common operations that usually needed when interacting with ZooKeeper.
@@ -282,6 +287,73 @@
   }
 
   /**
+   * Creates a ZK node of the given path. If the node already exists, deletion of the node (recursively) will happen
+   * and the creation will be retried.
+   */
+  public static OperationFuture<String> createDeleteIfExists(final ZKClient zkClient, final String path,
+                                                             @Nullable final byte[] data, final CreateMode createMode,
+                                                             final boolean createParent, final ACL...acls) {
+    final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(path,
+                                                                                        Threads.SAME_THREAD_EXECUTOR);
+    final List<ACL> createACLs = acls.length == 0 ? ZooDefs.Ids.OPEN_ACL_UNSAFE : Arrays.asList(acls);
+    createNode(zkClient, path, data, createMode, createParent, createACLs, new FutureCallback<String>() {
+
+      final FutureCallback<String> createCallback = this;
+
+      @Override
+      public void onSuccess(String result) {
+        // Create succeeded, just set the result to the resultFuture
+        resultFuture.set(result);
+      }
+
+      @Override
+      public void onFailure(final Throwable createFailure) {
+        // If create failed not because of the NodeExistsException, just set the exception to the result future
+        if (!(createFailure instanceof KeeperException.NodeExistsException)) {
+          resultFuture.setException(createFailure);
+          return;
+        }
+
+        // Try to delete the path
+        LOG.info("Node {}{} already exists. Deleting it and retry creation", zkClient.getConnectString(), path);
+        Futures.addCallback(recursiveDelete(zkClient, path), new FutureCallback<String>() {
+          @Override
+          public void onSuccess(String result) {
+            // If delete succeeded, perform the creation again.
+            createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback);
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            // If deletion failed because of NoNodeException, fail the result operation future
+            if (!(t instanceof KeeperException.NoNodeException)) {
+              createFailure.addSuppressed(t);
+              resultFuture.setException(createFailure);
+              return;
+            }
+
+            // If can't delete because the node no longer exists, just go ahead and recreate the node
+            createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback);
+          }
+        }, Threads.SAME_THREAD_EXECUTOR);
+      }
+    });
+
+    return resultFuture;
+  }
+
+  /**
+   * Private helper method to create a ZK node based on the parameter. The result of the creation is always
+   * communicate via the provided {@link FutureCallback}.
+   */
+  private static void createNode(ZKClient zkClient, String path, @Nullable byte[] data,
+                                 CreateMode createMode, boolean createParent,
+                                 Iterable<ACL> acls, FutureCallback<String> callback) {
+    Futures.addCallback(zkClient.create(path, data, createMode, createParent, acls),
+                        callback, Threads.SAME_THREAD_EXECUTOR);
+  }
+
+  /**
    * Watch for the given path until it exists.
    * @param zkClient The {@link ZKClient} to use.
    * @param path A ZooKeeper path to watch for existent.