Merge pull request #20 from nadav-har-tzvi/version-0.2.0-incubating-rc3
PySpark fixes for YARN and Mesos
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
index dc4f15e..e3c2812 100644
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
+++ b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -110,8 +111,9 @@
List<String> commands = Collections.singletonList(
- "env AMA_NODE=" + System.getenv("AMA_NODE") + " " +
- "$JAVA_HOME/bin/java" +
+ "env AMA_NODE=" + System.getenv("AMA_NODE") +
+ " env HADOOP_USER_NAME=" + UserGroupInformation.getCurrentUser().getUserName() +
+ " $JAVA_HOME/bin/java" +
" -Dscala.usejavacp=false" +
" -Xmx1G" +
" org.apache.amaterasu.leader.yarn.ApplicationMaster " +
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
index a44202a..1828100 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
@@ -21,8 +21,8 @@
import java.nio.ByteBuffer
import java.util
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
-import javax.jms.Session
+import javax.jms.Session
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.broker.BrokerService
import org.apache.amaterasu.common.configuration.ClusterConfig
@@ -153,21 +153,14 @@
// TODO: awsEnv currently set to empty string. should be changed to read values from (where?).
allocListener = new YarnRMCallbackHandler(nmClient, jobManager, env, awsEnv = "", config, executorJar)
- rmClient = AMRMClientAsync.createAMRMClientAsync(1000, this)
- rmClient.init(conf)
- rmClient.start()
-
- // Register with ResourceManager
- log.info("Registering application")
- val registrationResponse = rmClient.registerApplicationMaster("", 0, "")
- log.info("Registered application")
+ rmClient = startRMClient()
+ val registrationResponse = registerAppMaster("", 0, "")
val maxMem = registrationResponse.getMaximumResourceCapability.getMemory
log.info("Max mem capability of resources in this cluster " + maxMem)
val maxVCores = registrationResponse.getMaximumResourceCapability.getVirtualCores
log.info("Max vcores capability of resources in this cluster " + maxVCores)
log.info(s"Created jobManager. jobManager.registeredActions.size: ${jobManager.registeredActions.size}")
-
// Resource requirements for worker containers
this.capability = Records.newRecord(classOf[Resource])
val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
@@ -194,6 +187,21 @@
log.info("Finished asking for containers")
}
+ private def startRMClient(): AMRMClientAsync[ContainerRequest] = {
+ val client = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](1000, this)
+ client.init(conf)
+ client.start()
+ client
+ }
+
+ private def registerAppMaster(host: String, port: Int, url: String) = {
+ // Register with ResourceManager
+ log.info("Registering application")
+ val registrationResponse = rmClient.registerApplicationMaster(host, port, url)
+ log.info("Registered application")
+ registrationResponse
+ }
+
private def setupMessaging(jobId: String): Unit = {
val cf = new ActiveMQConnectionFactory(address)
@@ -225,20 +233,6 @@
override def onContainersAllocated(containers: util.List[Container]): Unit = {
- // creating the credentials for container execution
- val credentials = UserGroupInformation.getCurrentUser.getCredentials
- val dob = new DataOutputBuffer
- credentials.writeTokenStorageToStream(dob)
-
- // removing the AM->RM token so that containers cannot access it.
- val iter = credentials.getAllTokens.iterator
- log.info("Executing with tokens:")
- for (token <- iter) {
- log.info(token.toString)
- if (token.getKind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
- }
- val allTokens = ByteBuffer.wrap(dob.getData, 0, dob.getLength)
-
log.info(s"${containers.size()} Containers allocated")
for (container <- containers.asScala) { // Launch container by create ContainerLaunchContext
if (actionsBuffer.isEmpty) {
@@ -294,7 +288,8 @@
ctx.setEnvironment(Map[String, String](
"HADOOP_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
"YARN_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
- "AMA_NODE" -> sys.env("AMA_NODE")
+ "AMA_NODE" -> sys.env("AMA_NODE"),
+ "HADOOP_USER_NAME" -> UserGroupInformation.getCurrentUser.getUserName
))
log.info(s"hadoop conf dir is ${config.YARN.hadoopHomeDir}/conf/")
@@ -316,6 +311,22 @@
}
}
+ private def allTokens: ByteBuffer = {
+ // creating the credentials for container execution
+ val credentials = UserGroupInformation.getCurrentUser.getCredentials
+ val dob = new DataOutputBuffer
+ credentials.writeTokenStorageToStream(dob)
+
+ // removing the AM->RM token so that containers cannot access it.
+ val iter = credentials.getAllTokens.iterator
+ log.info("Executing with tokens:")
+ for (token <- iter) {
+ log.info(token.toString)
+ if (token.getKind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
+ }
+ ByteBuffer.wrap(dob.getData, 0, dob.getLength)
+ }
+
private def setupResources(frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = {
val sourcePath = Path.mergePaths(jarPath, new Path(s"/$resourcesPath"))