(TWILL-61) Fix to allow higher attempts to relaunch the app after the first attempt failed
- Delete the Kafka root zk node for the application if already exist
- Delete the AM instance zk node if already exist
- For runnables parent zk node, it is not an error if it already exist
- Enhance KafkaClient publisher / consumer to deal with Kafka cluster changes
- When AM killed and restarted, the embedded Kafka will be running in different host and port
This closes #67 on Github.
Signed-off-by: Terence Yim <chtyim@apache.org>
diff --git a/pom.xml b/pom.xml
index 84c4b8c..45aa64d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -180,6 +180,7 @@
<jopt-simple.version>3.2</jopt-simple.version>
<commons-compress.version>1.5</commons-compress.version>
<hadoop20.output.dir>target/hadoop20-classes</hadoop20.output.dir>
+ <force.mac.tests>false</force.mac.tests>
</properties>
<scm>
@@ -346,6 +347,7 @@
<redirectTestOutputToFile>${surefire.redirectTestOutputToFile}</redirectTestOutputToFile>
<systemPropertyVariables>
<java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
+ <force.mac.tests>${force.mac.tests}</force.mac.tests>
</systemPropertyVariables>
<reuseForks>false</reuseForks>
<reportFormat>plain</reportFormat>
@@ -362,6 +364,18 @@
<profiles>
<profile>
+ <!--
+ This profile is to force certain tests to run on Mac.
+ Those tests are disabled due to orphan processes left after the test run (HADOOP-12317).
+ If this profile is enabled, after the test finished, run the `jps` command
+ and delete all `TwillLauncher` processes
+ -->
+ <id>force-mac-tests</id>
+ <properties>
+ <force.mac.tests>true</force.mac.tests>
+ </properties>
+ </profile>
+ <profile>
<id>apache-release</id>
<build>
<plugins>
diff --git a/twill-api/src/main/java/org/apache/twill/api/Configs.java b/twill-api/src/main/java/org/apache/twill/api/Configs.java
index 9a21489..20a25f6 100644
--- a/twill-api/src/main/java/org/apache/twill/api/Configs.java
+++ b/twill-api/src/main/java/org/apache/twill/api/Configs.java
@@ -79,6 +79,18 @@
public static final String YARN_AM_RESERVED_MEMORY_MB = "twill.yarn.am.reserved.memory.mb";
/**
+ * Maximum number of attempts to run the application by YARN if there is failure.
+ */
+ public static final String YARN_MAX_APP_ATTEMPTS = "twill.yarn.max.app.attempts";
+
+ /**
+ * Interval time in milliseconds for the attempt failures validity interval in YARN. YARN only limit to
+ * the maximum attempt count for failures in the given interval.
+ */
+ public static final String YARN_ATTEMPT_FAILURES_VALIDITY_INTERVAL =
+ "twill.yarn.attempt.failures.validity.interval";
+
+ /**
* Setting for enabling log collection.
*/
public static final String LOG_COLLECTION_ENABLED = "twill.log.collection.enabled";
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
index fd8a939..0ff2fc8 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillController.java
@@ -222,6 +222,24 @@
return sendMessage(SystemMessages.resetLogLevels(runnableName, Sets.newHashSet(loggerNames)), loggerNames);
}
+ /**
+ * Reset the log handler to poll from the beginning of Kafka.
+ */
+ protected final synchronized void resetLogHandler() {
+ if (kafkaClient == null) {
+ return;
+ }
+ if (logCancellable != null) {
+ logCancellable.cancel();
+ logCancellable = null;
+ }
+ if (!logHandlers.isEmpty()) {
+ logCancellable = kafkaClient.getConsumer().prepare()
+ .addFromBeginning(Constants.LOG_TOPIC, 0)
+ .consume(new LogMessageCallback(logHandlers));
+ }
+ }
+
private void validateInstanceIds(String runnable, Set<Integer> instanceIds) {
ResourceReport resourceReport = getResourceReport();
if (resourceReport == null) {
diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
index 8e73653..425cd43 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java
@@ -206,7 +206,7 @@
}
/**
- * Update the live node for the runnable.
+ * Update the live node for the service.
*
* @return A {@link OperationFuture} that will be completed when the update is done.
*/
@@ -216,11 +216,15 @@
return zkClient.setData(liveNodePath, serializeLiveNode());
}
+ /**
+ * Creates the live node for the service. If the node already exists, it will be deleted before creation.
+ *
+ * @return A {@link OperationFuture} that will be completed when the creation is done.
+ */
private OperationFuture<String> createLiveNode() {
- String liveNodePath = getLiveNodePath();
- LOG.info("Create live node {}{}", zkClient.getConnectString(), liveNodePath);
- return ZKOperations.ignoreError(zkClient.create(liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL),
- KeeperException.NodeExistsException.class, liveNodePath);
+ final String liveNodePath = getLiveNodePath();
+ LOG.info("Creating live node {}{}", zkClient.getConnectString(), liveNodePath);
+ return ZKOperations.createDeleteIfExists(zkClient, liveNodePath, serializeLiveNode(), CreateMode.EPHEMERAL, true);
}
private OperationFuture<String> removeLiveNode() {
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
index 73235b7..f69350e 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
@@ -340,17 +340,17 @@
continue;
}
- // If offset < 0, meaning it's special offset value that needs to fetch either the earliest or latest offset
- // from kafak server.
- long off = offset.get();
- if (off < 0) {
- offset.set(getLastOffset(topicPart, off));
- }
-
- SimpleConsumer consumer = consumerEntry.getValue();
-
- // Fire a fetch message request
try {
+ // If offset < 0, meaning it's special offset value that needs to fetch either the earliest or latest offset
+ // from kafak server.
+ long off = offset.get();
+ if (off < 0) {
+ offset.set(getLastOffset(topicPart, off));
+ }
+
+ SimpleConsumer consumer = consumerEntry.getValue();
+
+ // Fire a fetch message request
FetchResponse response = fetchMessages(consumer, offset.get());
// Failure response, set consumer entry to null and let next round of loop to handle it.
@@ -364,6 +364,7 @@
consumers.refresh(consumerEntry.getKey());
consumerEntry = null;
+ backoff.backoff();
continue;
}
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
index f147d24..e5d0f8d 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
@@ -54,11 +54,11 @@
private final AtomicReference<Producer<Integer, ByteBuffer>> producer;
private final AtomicBoolean listenerCancelled;
- public SimpleKafkaPublisher(BrokerService brokerService, Ack ack, Compression compression) {
+ SimpleKafkaPublisher(BrokerService brokerService, Ack ack, Compression compression) {
this.brokerService = brokerService;
this.ack = ack;
this.compression = compression;
- this.producer = new AtomicReference<Producer<Integer, ByteBuffer>>();
+ this.producer = new AtomicReference<>();
this.listenerCancelled = new AtomicBoolean(false);
}
@@ -107,7 +107,7 @@
@Override
public Preparer add(ByteBuffer message, Object partitionKey) {
- messages.add(new KeyedMessage<Integer, ByteBuffer>(topic, Math.abs(partitionKey.hashCode()), message));
+ messages.add(new KeyedMessage<>(topic, Math.abs(partitionKey.hashCode()), message));
return this;
}
@@ -159,30 +159,37 @@
}
String newBrokerList = brokerService.getBrokerList();
- if (newBrokerList.isEmpty()) {
- LOG.warn("Broker list is empty. No Kafka producer is created.");
- return;
- }
+ // If there is no change, whether it is empty or not, just return
if (Objects.equal(brokerList, newBrokerList)) {
return;
}
- Properties props = new Properties();
- props.put("metadata.broker.list", newBrokerList);
- props.put("serializer.class", ByteBufferEncoder.class.getName());
- props.put("key.serializer.class", IntegerEncoder.class.getName());
- props.put("partitioner.class", IntegerPartitioner.class.getName());
- props.put("request.required.acks", Integer.toString(ack.getAck()));
- props.put("compression.codec", compression.getCodec());
+ Producer<Integer, ByteBuffer> newProducer = null;
+ if (!newBrokerList.isEmpty()) {
+ Properties props = new Properties();
+ props.put("metadata.broker.list", newBrokerList);
+ props.put("serializer.class", ByteBufferEncoder.class.getName());
+ props.put("key.serializer.class", IntegerEncoder.class.getName());
+ props.put("partitioner.class", IntegerPartitioner.class.getName());
+ props.put("request.required.acks", Integer.toString(ack.getAck()));
+ props.put("compression.codec", compression.getCodec());
- ProducerConfig config = new ProducerConfig(props);
- Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(new Producer<Integer, ByteBuffer>(config));
+ ProducerConfig config = new ProducerConfig(props);
+ newProducer = new Producer<>(config);
+ }
+
+ // If the broker list is empty, the producer will be set to null
+ Producer<Integer, ByteBuffer> oldProducer = producer.getAndSet(newProducer);
if (oldProducer != null) {
oldProducer.close();
}
- LOG.info("Update Kafka producer broker list: {}", newBrokerList);
+ if (newBrokerList.isEmpty()) {
+ LOG.warn("Empty Kafka producer broker list, publish will fail.");
+ } else {
+ LOG.info("Updated Kafka producer broker list: {}", newBrokerList);
+ }
brokerList = newBrokerList;
}
}
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
index 2ffc604..de42b9b 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/ZKBrokerService.java
@@ -51,6 +51,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@@ -58,6 +60,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
/**
* A {@link BrokerService} that watches kafka zk nodes for updates of broker lists and leader for
@@ -136,11 +139,13 @@
return brokerList.get();
}
- final SettableFuture<?> readerFuture = SettableFuture.create();
- final AtomicReference<Iterable<BrokerInfo>> brokers =
- new AtomicReference<Iterable<BrokerInfo>>(ImmutableList.<BrokerInfo>of());
+ final SettableFuture<?> readyFuture = SettableFuture.create();
+ final AtomicReference<List<BrokerInfo>> brokers = new AtomicReference<>(Collections.<BrokerInfo>emptyList());
actOnExists(BROKER_IDS_PATH, new Runnable() {
+
+ final Runnable thisRunnable = this;
+
@Override
public void run() {
// Callback for fetching children list. This callback should be executed in the executorService.
@@ -154,19 +159,19 @@
Iterables.transform(
brokerInfos.getAll(Iterables.transform(result.getChildren(), BROKER_ID_TRANSFORMER)).values(),
Suppliers.<BrokerInfo>supplierFunction())));
- readerFuture.set(null);
+ readyFuture.set(null);
for (ListenerExecutor listener : listeners) {
listener.changed(ZKBrokerService.this);
}
} catch (ExecutionException e) {
- readerFuture.setException(e.getCause());
+ readyFuture.setException(e.getCause());
}
}
@Override
public void onFailure(Throwable t) {
- readerFuture.setException(t);
+ readyFuture.setException(t);
}
};
@@ -179,15 +184,25 @@
}
if (event.getType() == Event.EventType.NodeChildrenChanged) {
Futures.addCallback(zkClient.getChildren(BROKER_IDS_PATH, this), childrenCallback, executorService);
+ } else if (event.getType() == Event.EventType.NodeDeleted) {
+ // If the ids node is deleted, clear the broker list and re-watch.
+ // This could happen when the Kafka server is restarted and have the ZK node cleanup
+ // The readyFuture for this call doesn't matter, as we don't need to block on anything
+ brokers.set(Collections.<BrokerInfo>emptyList());
+ for (ListenerExecutor listener : listeners) {
+ listener.changed(ZKBrokerService.this);
+ }
+ actOnExists(BROKER_IDS_PATH, thisRunnable, SettableFuture.create(),
+ FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
}
}
}), childrenCallback, executorService);
}
- }, readerFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
+ }, readyFuture, FAILURE_RETRY_SECONDS, TimeUnit.SECONDS);
- brokerList = createSupplier(brokers);
+ brokerList = this.<Iterable<BrokerInfo>>createSupplier(brokers);
try {
- readerFuture.get();
+ readyFuture.get();
} catch (Exception e) {
throw Throwables.propagate(e);
}
@@ -223,7 +238,7 @@
public Supplier<T> load(final K key) throws Exception {
// A future to tell if the result is ready, even it is failure.
final SettableFuture<T> readyFuture = SettableFuture.create();
- final AtomicReference<T> resultValue = new AtomicReference<T>();
+ final AtomicReference<T> resultValue = new AtomicReference<>();
// Fetch for node data when it exists.
final String path = key.getPath();
@@ -312,7 +327,7 @@
}
}), new FutureCallback<Stat>() {
@Override
- public void onSuccess(Stat result) {
+ public void onSuccess(@Nullable Stat result) {
if (result != null) {
action.run();
} else {
@@ -345,7 +360,7 @@
/**
* Creates a supplier that always return latest copy from an {@link java.util.concurrent.atomic.AtomicReference}.
*/
- private <T> Supplier<T> createSupplier(final AtomicReference<T> ref) {
+ private <T> Supplier<T> createSupplier(final AtomicReference<? extends T> ref) {
return new Supplier<T>() {
@Override
public T get() {
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
index aa14a75..c219171 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
@@ -59,7 +59,7 @@
public class Hadoop21YarnAppClient implements YarnAppClient {
private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAppClient.class);
- private final Configuration configuration;
+ protected final Configuration configuration;
public Hadoop21YarnAppClient(Configuration configuration) {
this.configuration = configuration;
@@ -108,7 +108,7 @@
addRMToken(launchContext, yarnClient, appId);
appSubmissionContext.setAMContainerSpec(launchContext);
appSubmissionContext.setResource(capability);
- appSubmissionContext.setMaxAppAttempts(2);
+ configureAppSubmissionContext(appSubmissionContext);
yarnClient.submitApplication(appSubmissionContext);
return new ProcessControllerImpl(appId);
@@ -126,6 +126,19 @@
}
}
+ /**
+ * Updates the {@link ApplicationSubmissionContext} based on configuration.
+ */
+ protected void configureAppSubmissionContext(ApplicationSubmissionContext context) {
+ int maxAttempts = configuration.getInt(Configs.Keys.YARN_MAX_APP_ATTEMPTS, -1);
+ if (maxAttempts > 0) {
+ context.setMaxAppAttempts(maxAttempts);
+ } else {
+ // Preserve the old behavior
+ context.setMaxAppAttempts(2);
+ }
+ }
+
private Resource adjustMemory(GetNewApplicationResponse response, Resource capability) {
int maxMemory = response.getMaximumResourceCapability().getMemory();
int updatedMemory = capability.getMemory();
diff --git a/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java b/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java
index 97d2a64..0e3382f 100644
--- a/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop23/org/apache/twill/internal/yarn/Hadoop23YarnAppClient.java
@@ -48,14 +48,12 @@
* </p>
*/
@SuppressWarnings("unused")
-public final class Hadoop23YarnAppClient extends Hadoop21YarnAppClient {
+public class Hadoop23YarnAppClient extends Hadoop21YarnAppClient {
private static final Logger LOG = LoggerFactory.getLogger(Hadoop23YarnAppClient.class);
- private final Configuration configuration;
public Hadoop23YarnAppClient(Configuration configuration) {
super(configuration);
- this.configuration = configuration;
}
/**
diff --git a/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java b/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java
new file mode 100644
index 0000000..1c27518
--- /dev/null
+++ b/twill-yarn/src/main/hadoop26/org/apache/twill/internal/yarn/Hadoop26YarnAppClient.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.twill.api.Configs;
+
+/**
+ * <p>
+ * The service implementation of {@link YarnAppClient} for Apache Hadoop 2.6 and beyond.
+ *
+ * The {@link VersionDetectYarnAppClientFactory} class will decide to return instance of this class for
+ * Apache Hadoop 2.6 and beyond.
+ * </p>
+ */
+@SuppressWarnings("unused")
+public class Hadoop26YarnAppClient extends Hadoop23YarnAppClient {
+
+ public Hadoop26YarnAppClient(Configuration configuration) {
+ super(configuration);
+ }
+
+ @Override
+ protected void configureAppSubmissionContext(ApplicationSubmissionContext context) {
+ super.configureAppSubmissionContext(context);
+ long interval = configuration.getLong(Configs.Keys.YARN_ATTEMPT_FAILURES_VALIDITY_INTERVAL, -1L);
+ if (interval > 0) {
+ context.setAttemptFailuresValidityInterval(interval);
+ }
+ }
+}
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 445656d..7706d52 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -165,9 +165,10 @@
@Override
protected void startUp() throws Exception {
- ZKOperations.ignoreError(
- zkClient.create(kafkaZKPath, null, CreateMode.PERSISTENT),
- KeeperException.NodeExistsException.class, kafkaZKPath).get();
+ // Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is
+ // no left over content from previous AM attempt.
+ LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath);
+ ZKOperations.createDeleteIfExists(zkClient, kafkaZKPath, null, CreateMode.PERSISTENT, true).get();
kafkaServer.startAndWait();
}
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 6fc31f5..8a80041 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -78,7 +78,9 @@
import org.apache.twill.internal.yarn.YarnUtils;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
+import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -318,8 +320,12 @@
instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
- // Creates ZK path for runnable
- zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT).get();
+ // Creates ZK path for runnable. It's ok if the path already exists.
+ // That's for the case when the AM get killed and restarted
+ ZKOperations.ignoreError(
+ zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT),
+ KeeperException.NodeExistsException.class, null)
+ .get();
runningContainers.addWatcher(Constants.DISCOVERY_PATH_PREFIX);
runnableContainerRequests = initContainerRequests();
}
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
index c8e88c9..83de2a4 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
@@ -39,9 +39,13 @@
// 2.1 and 2.2 uses the same YarnAppClient
clzName = getClass().getPackage().getName() + ".Hadoop21YarnAppClient";
break;
- default:
+ case HADOOP_23:
// 2.3 and above uses the 2.3 YarnAppClient to support RM HA
clzName = getClass().getPackage().getName() + ".Hadoop23YarnAppClient";
+ break;
+ default:
+ // Anything above 2.3 will be 2.6 and beyond
+ clzName = getClass().getPackage().getName() + ".Hadoop26YarnAppClient";
}
Class<YarnAppClient> clz = (Class<YarnAppClient>) Class.forName(clzName);
return clz.getConstructor(Configuration.class).newInstance(configuration);
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
index 335d7ec..8f844a2 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillController.java
@@ -21,6 +21,7 @@
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -68,6 +69,7 @@
private final TimeUnit startTimeoutUnit;
private volatile ApplicationMasterLiveNodeData amLiveNodeData;
private ProcessController<YarnApplicationReport> processController;
+ private ApplicationAttemptId currentAttemptId;
// Thread for polling yarn for application status if application got ZK session expire.
// Only used by the instanceUpdate/Delete method, which is from serialized call from ZK callback.
@@ -141,6 +143,8 @@
LOG.info("Yarn application {} {} is not in running state. Shutting down controller.", appName, appId);
forceShutDown();
}
+
+ currentAttemptId = report.getCurrentApplicationAttemptId();
} catch (Exception e) {
throw Throwables.propagate(e);
}
@@ -273,6 +277,13 @@
shutdown = true;
break;
}
+ ApplicationAttemptId attemptId = report.getCurrentApplicationAttemptId();
+ if (currentAttemptId.compareTo(attemptId) != 0) {
+ LOG.info("Application attempt ID change from {} to {}", currentAttemptId, attemptId);
+ currentAttemptId = attemptId;
+ resetLogHandler();
+ }
+
// Make a sync exists call to instance node and re-watch if the node exists
try {
// The timeout is arbitrary, as it's just for avoiding block forever
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java
new file mode 100644
index 0000000..4f5adce
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/AppRecoveryTestRun.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.yarn;
+
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.EventHandler;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.LogEntry;
+import org.apache.twill.api.logging.LogHandler;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.internal.yarn.YarnUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Unit test for application master resilience.
+ */
+public class AppRecoveryTestRun extends BaseYarnTest {
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+ @Test
+ public void testAMRestart() throws Exception {
+ // Only run it with Hadoop-2.1 or above
+ Assume.assumeTrue(YarnUtils.getHadoopVersion().compareTo(YarnUtils.HadoopVersions.HADOOP_21) >= 0);
+ // Don't run this test in Mac, as there would be leftover java process (HADOOP-12317)
+ // The test can be force to run by turning on the "force-mac-tests" maven profile
+ // After the test finished, run the `jps` command and delete all `TwillLauncher` processes
+ Assume.assumeTrue(Boolean.parseBoolean(System.getProperty("force.mac.tests")) ||
+ !System.getProperty("os.name").toLowerCase().contains("mac"));
+
+ File watchFile = TEMP_FOLDER.newFile();
+ watchFile.delete();
+
+ // Start the testing app, and wait for 4 log lines that match the pattern emitted by the event handler (AM)
+ // and from the runnable
+ final Semaphore semaphore = new Semaphore(0);
+ TwillRunner runner = getTwillRunner();
+ TwillController controller = runner.prepare(new TestApp(new TestEventHandler(watchFile)))
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ // Use a log handler to match messages from AM and the Runnable to make sure the log collection get resumed
+ // correctly after the AM restarted
+ .addLogHandler(new LogHandler() {
+ @Override
+ public void onLog(LogEntry logEntry) {
+ String message = logEntry.getMessage();
+ if (message.equals("Container for " + TestRunnable.class.getSimpleName() + " launched")) {
+ semaphore.release();
+ } else if (message.equals("Running 0")) {
+ semaphore.release();
+ }
+ }
+ })
+ .start();
+
+ // Wait for the first attempt running
+ Assert.assertTrue(semaphore.tryAcquire(2, 2, TimeUnit.MINUTES));
+ // Touch the watchFile so that the event handler will kill the AM
+ Files.touch(watchFile);
+ // Wait for the second attempt running
+ Assert.assertTrue(semaphore.tryAcquire(2, 2, TimeUnit.MINUTES));
+
+ controller.terminate().get();
+ }
+
+ /**
+ * A {@link EventHandler} for killing the first attempt of the application.
+ */
+ public static final class TestEventHandler extends EventHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestEventHandler.class);
+
+ private File watchFile;
+
+ TestEventHandler(File watchFile) {
+ this.watchFile = watchFile;
+ }
+
+ @Override
+ public void containerLaunched(String runnableName, int instanceId, String containerId) {
+ LOG.info("Container for {} launched", runnableName);
+
+ if (containerId.contains("_01_")) {
+ final File watchFile = new File(context.getSpecification().getConfigs().get("watchFile"));
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ // Wait for the watch file to be available, then kill the process
+ while (!watchFile.exists()) {
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ Runtime.getRuntime().halt(-1);
+ }
+ };
+ t.setDaemon(true);
+ t.start();
+ }
+ }
+
+ @Override
+ protected Map<String, String> getConfigs() {
+ return Collections.singletonMap("watchFile", watchFile.getAbsolutePath());
+ }
+ }
+
+ /**
+ * Application for testing
+ */
+ public static final class TestApp implements TwillApplication {
+
+ private final EventHandler eventHandler;
+
+ public TestApp(EventHandler eventHandler) {
+ this.eventHandler = eventHandler;
+ }
+
+ @Override
+ public TwillSpecification configure() {
+ return TwillSpecification.Builder.with()
+ .setName("TestApp")
+ .withRunnable()
+ .add(new TestRunnable()).noLocalFiles()
+ .anyOrder()
+ .withEventHandler(eventHandler).build();
+ }
+ }
+
+ /**
+ * Runnable for testing
+ */
+ public static final class TestRunnable extends AbstractTwillRunnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestRunnable.class);
+
+ private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+ @Override
+ public void run() {
+ long count = 0;
+ try {
+ while (!stopLatch.await(2, TimeUnit.SECONDS)) {
+ LOG.info("Running {}", count++);
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted", e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ stopLatch.countDown();
+ }
+ }
+}
diff --git a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
index 0e2239d..bce6391 100644
--- a/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
+++ b/twill-zookeeper/src/main/java/org/apache/twill/zookeeper/ZKOperations.java
@@ -25,16 +25,21 @@
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.zookeeper.SettableOperationFuture;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
/**
* Collection of helper methods for common operations that usually needed when interacting with ZooKeeper.
@@ -282,6 +287,73 @@
}
/**
+ * Creates a ZK node of the given path. If the node already exists, deletion of the node (recursively) will happen
+ * and the creation will be retried.
+ */
+ public static OperationFuture<String> createDeleteIfExists(final ZKClient zkClient, final String path,
+ @Nullable final byte[] data, final CreateMode createMode,
+ final boolean createParent, final ACL...acls) {
+ final SettableOperationFuture<String> resultFuture = SettableOperationFuture.create(path,
+ Threads.SAME_THREAD_EXECUTOR);
+ final List<ACL> createACLs = acls.length == 0 ? ZooDefs.Ids.OPEN_ACL_UNSAFE : Arrays.asList(acls);
+ createNode(zkClient, path, data, createMode, createParent, createACLs, new FutureCallback<String>() {
+
+ final FutureCallback<String> createCallback = this;
+
+ @Override
+ public void onSuccess(String result) {
+ // Create succeeded, just set the result to the resultFuture
+ resultFuture.set(result);
+ }
+
+ @Override
+ public void onFailure(final Throwable createFailure) {
+ // If create failed not because of the NodeExistsException, just set the exception to the result future
+ if (!(createFailure instanceof KeeperException.NodeExistsException)) {
+ resultFuture.setException(createFailure);
+ return;
+ }
+
+ // Try to delete the path
+ LOG.info("Node {}{} already exists. Deleting it and retry creation", zkClient.getConnectString(), path);
+ Futures.addCallback(recursiveDelete(zkClient, path), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String result) {
+ // If delete succeeded, perform the creation again.
+ createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // If deletion failed because of NoNodeException, fail the result operation future
+ if (!(t instanceof KeeperException.NoNodeException)) {
+ createFailure.addSuppressed(t);
+ resultFuture.setException(createFailure);
+ return;
+ }
+
+ // If can't delete because the node no longer exists, just go ahead and recreate the node
+ createNode(zkClient, path, data, createMode, createParent, createACLs, createCallback);
+ }
+ }, Threads.SAME_THREAD_EXECUTOR);
+ }
+ });
+
+ return resultFuture;
+ }
+
+ /**
+ * Private helper method to create a ZK node based on the parameter. The result of the creation is always
+ * communicate via the provided {@link FutureCallback}.
+ */
+ private static void createNode(ZKClient zkClient, String path, @Nullable byte[] data,
+ CreateMode createMode, boolean createParent,
+ Iterable<ACL> acls, FutureCallback<String> callback) {
+ Futures.addCallback(zkClient.create(path, data, createMode, createParent, acls),
+ callback, Threads.SAME_THREAD_EXECUTOR);
+ }
+
+ /**
* Watch for the given path until it exists.
* @param zkClient The {@link ZKClient} to use.
* @param path A ZooKeeper path to watch for existent.