[GEARPUMP-355] Fix YarnAppMaster address resolution in a kerberized H…
…adoop/Yarn set-up
Author: Timea Magyar <Timea.Magyar@etas.com>
Closes #231 from titikakatoo/yarn_spnego_authentication.
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
index 1907a95..53e93f9 100644
--- a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
@@ -18,14 +18,10 @@
package org.apache.gearpump.experiments.yarn.appmaster
-import java.io.IOException
import java.util.concurrent.TimeUnit
-
import akka.actor._
import akka.util.Timeout
import com.typesafe.config.ConfigValueFactory
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.GetMethod
import org.apache.gearpump.cluster.ClientToMaster._
import org.apache.gearpump.cluster.ClusterConfig
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
@@ -35,7 +31,6 @@
import org.apache.gearpump.transport.HostPort
import org.apache.gearpump.util._
import org.slf4j.Logger
-
import scala.collection.JavaConverters._
import scala.concurrent.Await
import scala.concurrent.duration.Duration
@@ -364,22 +359,8 @@
case class WorkerInfo(id: ContainerId, nodeId: NodeId)
def getAppMaster(report: ApplicationReport, system: ActorSystem): ActorRef = {
- val client = new HttpClient()
- val appMasterPath = s"${report.getTrackingURL}/supervisor-actor-path"
- val get = new GetMethod(appMasterPath)
- var status = client.executeMethod(get)
+ import org.apache.gearpump.experiments.yarn.client.AppMasterResolver
- if (status != 200) {
- // Sleeps a little bit, and try again
- Thread.sleep(3000)
- status = client.executeMethod(get)
- }
-
- if (status == 200) {
- AkkaHelper.actorFor(system, get.getResponseBodyAsString)
- } else {
- throw new IOException("Fail to resolve AppMaster address, please make sure " +
- s"${report.getTrackingURL} is accessible...")
- }
+ AppMasterResolver.resolveAppMasterAddress(report, system)
}
}
\ No newline at end of file
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
index 90653e1..c05b4e2 100644
--- a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
@@ -19,14 +19,15 @@
package org.apache.gearpump.experiments.yarn.client
import java.io.IOException
-
+import java.net.{HttpURLConnection, URL}
+import java.nio.charset.StandardCharsets
import akka.actor.{ActorRef, ActorSystem}
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.GetMethod
-import org.apache.gearpump.experiments.yarn.glue.Records.ApplicationId
+import org.apache.commons.io.IOUtils
+import org.apache.gearpump.experiments.yarn.glue.Records.{ApplicationId, ApplicationReport}
import org.apache.gearpump.experiments.yarn.glue.YarnClient
import org.apache.gearpump.util.{AkkaHelper, LogUtil}
-
+import org.apache.hadoop.hdfs.web.URLConnectionFactory
+import org.apache.hadoop.yarn.conf.YarnConfiguration
import scala.util.Try
/**
@@ -43,19 +44,8 @@
private def connect(appId: ApplicationId): ActorRef = {
val report = yarnClient.getApplicationReport(appId)
- val client = new HttpClient()
- val appMasterPath = s"${report.getTrackingURL}/supervisor-actor-path"
- LOG.info(s"appMasterPath=$appMasterPath")
- val get = new GetMethod(appMasterPath)
- val status = client.executeMethod(get)
- if (status == 200) {
- val response = get.getResponseBodyAsString
- LOG.info("Successfully resolved AppMaster address: " + response)
- AkkaHelper.actorFor(system, response)
- } else {
- throw new IOException("Fail to resolve AppMaster address, please make sure " +
- s"${report.getTrackingURL} is accessible...")
- }
+
+ AppMasterResolver.resolveAppMasterAddress(report, system)
}
private def retry(fun: => ActorRef, times: Int): ActorRef = {
@@ -75,3 +65,39 @@
result
}
}
+
+object AppMasterResolver {
+ val LOG = LogUtil.getLogger(getClass)
+
+ def resolveAppMasterAddress(report: ApplicationReport, system: ActorSystem): ActorRef = {
+ val appMasterPath = s"${report.getTrackingURL}/supervisor-actor-path"
+ LOG.info(s"appMasterPath=$appMasterPath")
+
+ val connectionFactory: URLConnectionFactory = URLConnectionFactory
+ .newDefaultURLConnectionFactory(new YarnConfiguration())
+ val url: URL = new URL(appMasterPath)
+ val connection: HttpURLConnection = connectionFactory.openConnection(url)
+ .asInstanceOf[HttpURLConnection]
+ connection.setInstanceFollowRedirects(true)
+
+ try {
+ connection.connect()
+ } catch {
+ case e: IOException =>
+ LOG.error(s"Failed to connect to AppMaster" + e.getMessage)
+ }
+
+ val status = connection.getResponseCode
+ if (status == 200) {
+ val stream: java.io.InputStream = connection.getInputStream
+ val response = IOUtils.toString(stream, StandardCharsets.UTF_8)
+ LOG.info("Successfully resolved AppMaster address: " + response)
+ connection.disconnect()
+ AkkaHelper.actorFor(system, response)
+ } else {
+ connection.disconnect()
+ throw new IOException("Fail to resolve AppMaster address, please make sure " +
+ s"${report.getTrackingURL} is accessible...")
+ }
+ }
+}