(TWILL-81) Use user home directory as the base directory to construct default location factory in YarnTwillRunnerService. Also includes the following bug fixes
- Instead of using "/twill", which in many cases there would be no write permission, use user home directory.
- Fix a bug in ServiceMain that, when creating LocationFactory for container to use, it should be created with the application directory URI.
- Use MiniDFSCluster to run YarnTestSuite
- Fix a bug in YarnTestSuite that a directory given to create the location factory was getting deleted after each test run, which would break test when using MiniDFSCluster
Signed-off-by: Terence Yim <terence@continuuity.com>
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index 740e0e3..6f04ed7 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -118,14 +118,14 @@
if ("hdfs".equals(appDir.getScheme())) {
if (UserGroupInformation.isSecurityEnabled()) {
- return new HDFSLocationFactory(FileSystem.get(conf)).create(appDir);
+ return new HDFSLocationFactory(FileSystem.get(appDir, conf)).create(appDir);
}
String fsUser = System.getenv(EnvKeys.TWILL_FS_USER);
if (fsUser == null) {
throw new IllegalStateException("Missing environment variable " + EnvKeys.TWILL_FS_USER);
}
- return new HDFSLocationFactory(FileSystem.get(FileSystem.getDefaultUri(conf), conf, fsUser)).create(appDir);
+ return new HDFSLocationFactory(FileSystem.get(appDir, conf, fsUser)).create(appDir);
}
LOG.warn("Unsupported location type {}.", appDir);
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 2b7f049..69564c8 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -94,7 +94,6 @@
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
-import java.net.URI;
import java.net.URL;
import java.util.Collection;
import java.util.Iterator;
@@ -321,7 +320,7 @@
} finally {
try {
// App location cleanup
- cleanupDir(URI.create(System.getenv(EnvKeys.TWILL_APP_DIR)));
+ cleanupDir();
Loggings.forceFlush();
// Sleep a short while to let kafka clients to have chance to fetch the log
TimeUnit.SECONDS.sleep(1);
@@ -332,15 +331,15 @@
}
}
- private void cleanupDir(URI appDir) {
+ private void cleanupDir() {
try {
if (applicationLocation.delete(true)) {
- LOG.info("Application directory deleted: {}", appDir);
+ LOG.info("Application directory deleted: {}", applicationLocation);
} else {
- LOG.warn("Failed to cleanup directory {}.", appDir);
+ LOG.warn("Failed to cleanup directory {}.", applicationLocation);
}
} catch (Exception e) {
- LOG.warn("Exception while cleanup directory {}.", appDir, e);
+ LOG.warn("Exception while cleanup directory {}.", applicationLocation, e);
}
}
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index dadedce..4c16cc1 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -40,6 +40,7 @@
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.security.Credentials;
@@ -134,10 +135,24 @@
private volatile String jvmOptions = null;
+ /**
+ * Creates an instance with a {@link HDFSLocationFactory} created base on the given configuration with the
+ * user home directory as the location factory namespace.
+ *
+ * @param config Configuration of the yarn cluster
+ * @param zkConnect ZooKeeper connection string
+ */
public YarnTwillRunnerService(YarnConfiguration config, String zkConnect) {
- this(config, zkConnect, new HDFSLocationFactory(getFileSystem(config), "/twill"));
+ this(config, zkConnect, createDefaultLocationFactory(config));
}
+ /**
+ * Creates an instance.
+ *
+ * @param config Configuration of the yarn cluster
+ * @param zkConnect ZooKeeper connection string
+ * @param locationFactory Factory to create {@link Location} instances that are readable and writable by this service
+ */
public YarnTwillRunnerService(YarnConfiguration config, String zkConnect, LocationFactory locationFactory) {
this.yarnConfig = config;
this.yarnAppClient = new VersionDetectYarnAppClientFactory().create(config);
@@ -580,9 +595,10 @@
LOG.debug("Secure store for {} {} saved to {}.", application, runId, credentialsLocation.toURI());
}
- private static FileSystem getFileSystem(YarnConfiguration configuration) {
+ private static LocationFactory createDefaultLocationFactory(Configuration configuration) {
try {
- return FileSystem.get(configuration);
+ FileSystem fs = FileSystem.get(configuration);
+ return new HDFSLocationFactory(fs, fs.getHomeDirectory().toUri().getPath());
} catch (IOException e) {
throw Throwables.propagate(e);
}
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
index 14b7f67..304c490 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java
@@ -32,6 +32,6 @@
@BeforeClass
public static final void init() throws IOException {
- YarnTestUtils.initOnce(tmpFolder.newFolder());
+ YarnTestUtils.initOnce();
}
}
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
index 23fc82b..1a5c4bd 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
@@ -115,7 +115,7 @@
Assert.assertTrue(YarnTestUtils.waitForSize(apps, 1, 60));
// Creates a new runner service to check it can regain control over running app.
- TwillRunnerService runnerService = YarnTestUtils.createTwillRunnerService(tmpFolder.newFolder());
+ TwillRunnerService runnerService = YarnTestUtils.createTwillRunnerService();
runnerService.startAndWait();
try {
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java
index 909c366..bbeb5a2 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java
@@ -19,13 +19,15 @@
import com.google.common.collect.Iterables;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.TwillRunnerService;
-import org.apache.twill.filesystem.LocalLocationFactory;
import org.apache.twill.internal.yarn.YarnUtils;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,21 +43,28 @@
private static final Logger LOG = LoggerFactory.getLogger(YarnTestUtils.class);
private static InMemoryZKServer zkServer;
+ private static MiniDFSCluster dfsCluster;
private static MiniYARNCluster cluster;
private static TwillRunnerService runnerService;
private static YarnConfiguration config;
private static final AtomicBoolean once = new AtomicBoolean(false);
- public static final boolean initOnce(File folder) throws IOException {
+ public static final boolean initOnce() throws IOException {
if (once.compareAndSet(false, true)) {
- init(folder);
+ final TemporaryFolder tmpFolder = new TemporaryFolder();
+ tmpFolder.create();
+ init(tmpFolder.newFolder());
// add shutdown hook because we want to initialized/cleanup once
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- finish();
+ try {
+ finish();
+ } finally {
+ tmpFolder.delete();
+ }
}
}));
return true;
@@ -69,7 +78,12 @@
zkServer.startAndWait();
// Start YARN mini cluster
- config = new YarnConfiguration(new Configuration());
+ LOG.info("Starting Mini DFS on path {}", folder);
+ Configuration fsConf = new HdfsConfiguration(new Configuration());
+ fsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, folder.getAbsolutePath());
+ dfsCluster = new MiniDFSCluster.Builder(fsConf).numDataNodes(1).build();
+
+ config = new YarnConfiguration(dfsCluster.getFileSystem().getConf());
if (YarnUtils.isHadoop20()) {
config.set("yarn.resourcemanager.scheduler.class",
@@ -90,7 +104,7 @@
cluster.init(config);
cluster.start();
- runnerService = createTwillRunnerService(folder);
+ runnerService = createTwillRunnerService();
runnerService.startAndWait();
}
@@ -98,6 +112,7 @@
if (once.compareAndSet(true, false)) {
runnerService.stopAndWait();
cluster.stop();
+ dfsCluster.shutdown();
zkServer.stopAndWait();
return true;
@@ -117,9 +132,8 @@
/**
* Creates an unstarted instance of {@link org.apache.twill.api.TwillRunnerService}.
*/
- public static final TwillRunnerService createTwillRunnerService(File folder) throws IOException {
- YarnTwillRunnerService runner = new YarnTwillRunnerService(config, zkServer.getConnectionStr() + "/twill",
- new LocalLocationFactory(folder));
+ public static final TwillRunnerService createTwillRunnerService() throws IOException {
+ YarnTwillRunnerService runner = new YarnTwillRunnerService(config, zkServer.getConnectionStr() + "/twill");
// disable tests stealing focus
runner.setJVMOptions("-Djava.awt.headless=true");
return runner;