(TWILL-173) Have EmbeddedKafkaServer restart multiple times on bind failure
- Due to potential race condition between random port generation vs actual binding, there is a possibility that the binding would fail.
Signed-off-by: Terence Yim <chtyim@apache.org>
This closes #9 on Github
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
index cd86dcb..be6121d 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/EmbeddedKafkaServer.java
@@ -17,16 +17,21 @@
*/
package org.apache.twill.internal.kafka;
+import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractIdleService;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.Time;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
+import org.apache.twill.internal.utils.Networks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.BindException;
import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
/**
* A {@link com.google.common.util.concurrent.Service} implementation for running an instance of Kafka server in
@@ -34,25 +39,27 @@
*/
public final class EmbeddedKafkaServer extends AbstractIdleService {
- public static final String START_TIMEOUT_RETRIES = "twill.kafka.start.timeout.retries";
+ public static final String START_RETRIES = "twill.kafka.start.timeout.retries";
private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaServer.class);
- private static final String DEFAULT_START_TIMEOUT_RETRIES = "5";
+ private static final String DEFAULT_START_RETRIES = "5";
private final int startTimeoutRetries;
- private final KafkaConfig kafkaConfig;
+ private final Properties properties;
private KafkaServer server;
public EmbeddedKafkaServer(Properties properties) {
- this.startTimeoutRetries = Integer.parseInt(properties.getProperty(START_TIMEOUT_RETRIES,
- DEFAULT_START_TIMEOUT_RETRIES));
- this.kafkaConfig = new KafkaConfig(properties);
+ this.startTimeoutRetries = Integer.parseInt(properties.getProperty(START_RETRIES,
+ DEFAULT_START_RETRIES));
+ this.properties = new Properties();
+ this.properties.putAll(properties);
}
@Override
protected void startUp() throws Exception {
int tries = 0;
do {
+ KafkaConfig kafkaConfig = createKafkaConfig(properties);
KafkaServer kafkaServer = createKafkaServer(kafkaConfig);
try {
kafkaServer.startup();
@@ -65,9 +72,14 @@
if (rootCause instanceof ZkTimeoutException) {
// Potentially caused by race condition bug described in TWILL-139.
LOG.warn("Timeout when connecting to ZooKeeper from KafkaServer. Attempt number {}.", tries, rootCause);
+ } else if (rootCause instanceof BindException) {
+ LOG.warn("Kafka failed to bind to port {}. Attempt number {}.", kafkaConfig.port(), tries, rootCause);
} else {
throw e;
}
+
+ // Do a random sleep of < 200ms
+ TimeUnit.MILLISECONDS.sleep(new Random().nextInt(200) + 1L);
}
} while (server == null && ++tries < startTimeoutRetries);
@@ -107,4 +119,22 @@
}
});
}
+
+ /**
+ * Creates a new {@link KafkaConfig} from the given {@link Properties}. If the {@code "port"} property is missing
+ * or is equals to {@code "0"}, a random port will be generated.
+ */
+ private KafkaConfig createKafkaConfig(Properties properties) {
+ Properties prop = new Properties();
+ prop.putAll(properties);
+
+ String port = prop.getProperty("port");
+ if (port == null || "0".equals(port)) {
+ int randomPort = Networks.getRandomPort();
+ Preconditions.checkState(randomPort > 0, "Failed to get random port.");
+ prop.setProperty("port", Integer.toString(randomPort));
+ }
+
+ return new KafkaConfig(prop);
+ }
}
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index 38a2463..e708fb8 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -17,7 +17,6 @@
*/
package org.apache.twill.internal.appmaster;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.conf.Configuration;
@@ -30,7 +29,6 @@
import org.apache.twill.internal.ServiceMain;
import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
import org.apache.twill.internal.logging.Loggings;
-import org.apache.twill.internal.utils.Networks;
import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
import org.apache.twill.internal.yarn.YarnAMClient;
import org.apache.twill.zookeeper.OperationFuture;
@@ -168,12 +166,8 @@
}
private Properties generateKafkaConfig(String kafkaZKConnect) {
- int port = Networks.getRandomPort();
- Preconditions.checkState(port > 0, "Failed to get random port.");
-
Properties prop = new Properties();
prop.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
- prop.setProperty("port", Integer.toString(port));
prop.setProperty("broker.id", "1");
prop.setProperty("socket.send.buffer.bytes", "1048576");
prop.setProperty("socket.receive.buffer.bytes", "1048576");
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java
index 7990a5b..e4f789a 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogLevelTestRun.java
@@ -112,7 +112,7 @@
}, Threads.SAME_THREAD_EXECUTOR);
Assert.assertTrue(running.await(200, TimeUnit.SECONDS));
- LogEntry.Level logLevel = waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(), 5L,
+ LogEntry.Level logLevel = waitForLogLevel(controller, LogLevelTestRunnable.class.getSimpleName(), 30L,
TimeUnit.SECONDS);
// Verify we got DEBUG log level.