[GEARPUMP-355] Fix YarnAppMaster address resolution in a kerberized H…

…adoop/Yarn set-up

Author: Timea Magyar <Timea.Magyar@etas.com>

Closes #231 from titikakatoo/yarn_spnego_authentication.
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
index 1907a95..53e93f9 100644
--- a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/appmaster/YarnAppMaster.scala
@@ -18,14 +18,10 @@
 
 package org.apache.gearpump.experiments.yarn.appmaster
 
-import java.io.IOException
 import java.util.concurrent.TimeUnit
-
 import akka.actor._
 import akka.util.Timeout
 import com.typesafe.config.ConfigValueFactory
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.GetMethod
 import org.apache.gearpump.cluster.ClientToMaster._
 import org.apache.gearpump.cluster.ClusterConfig
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
@@ -35,7 +31,6 @@
 import org.apache.gearpump.transport.HostPort
 import org.apache.gearpump.util._
 import org.slf4j.Logger
-
 import scala.collection.JavaConverters._
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
@@ -364,22 +359,8 @@
   case class WorkerInfo(id: ContainerId, nodeId: NodeId)
 
   def getAppMaster(report: ApplicationReport, system: ActorSystem): ActorRef = {
-    val client = new HttpClient()
-    val appMasterPath = s"${report.getTrackingURL}/supervisor-actor-path"
-    val get = new GetMethod(appMasterPath)
-    var status = client.executeMethod(get)
+    import org.apache.gearpump.experiments.yarn.client.AppMasterResolver
 
-    if (status != 200) {
-      // Sleeps a little bit, and try again
-      Thread.sleep(3000)
-      status = client.executeMethod(get)
-    }
-
-    if (status == 200) {
-      AkkaHelper.actorFor(system, get.getResponseBodyAsString)
-    } else {
-      throw new IOException("Fail to resolve AppMaster address, please make sure " +
-        s"${report.getTrackingURL} is accessible...")
-    }
+    AppMasterResolver.resolveAppMasterAddress(report, system)
   }
 }
\ No newline at end of file
diff --git a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
index 90653e1..c05b4e2 100644
--- a/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
+++ b/experiments/yarn/src/main/scala/org/apache/gearpump/experiments/yarn/client/AppMasterResolver.scala
@@ -19,14 +19,15 @@
 package org.apache.gearpump.experiments.yarn.client
 
 import java.io.IOException
-
+import java.net.{HttpURLConnection, URL}
+import java.nio.charset.StandardCharsets
 import akka.actor.{ActorRef, ActorSystem}
-import org.apache.commons.httpclient.HttpClient
-import org.apache.commons.httpclient.methods.GetMethod
-import org.apache.gearpump.experiments.yarn.glue.Records.ApplicationId
+import org.apache.commons.io.IOUtils
+import org.apache.gearpump.experiments.yarn.glue.Records.{ApplicationId, ApplicationReport}
 import org.apache.gearpump.experiments.yarn.glue.YarnClient
 import org.apache.gearpump.util.{AkkaHelper, LogUtil}
-
+import org.apache.hadoop.hdfs.web.URLConnectionFactory
+import org.apache.hadoop.yarn.conf.YarnConfiguration
 import scala.util.Try
 
 /**
@@ -43,19 +44,8 @@
 
   private def connect(appId: ApplicationId): ActorRef = {
     val report = yarnClient.getApplicationReport(appId)
-    val client = new HttpClient()
-    val appMasterPath = s"${report.getTrackingURL}/supervisor-actor-path"
-    LOG.info(s"appMasterPath=$appMasterPath")
-    val get = new GetMethod(appMasterPath)
-    val status = client.executeMethod(get)
-    if (status == 200) {
-      val response = get.getResponseBodyAsString
-      LOG.info("Successfully resolved AppMaster address: " + response)
-      AkkaHelper.actorFor(system, response)
-    } else {
-      throw new IOException("Fail to resolve AppMaster address, please make sure " +
-        s"${report.getTrackingURL} is accessible...")
-    }
+
+    AppMasterResolver.resolveAppMasterAddress(report, system)
   }
 
   private def retry(fun: => ActorRef, times: Int): ActorRef = {
@@ -75,3 +65,39 @@
     result
   }
 }
+
+object AppMasterResolver {
+  val LOG = LogUtil.getLogger(getClass)
+
+  def resolveAppMasterAddress(report: ApplicationReport, system: ActorSystem): ActorRef = {
+    val appMasterPath = s"${report.getTrackingURL}/supervisor-actor-path"
+    LOG.info(s"appMasterPath=$appMasterPath")
+
+    val connectionFactory: URLConnectionFactory = URLConnectionFactory
+      .newDefaultURLConnectionFactory(new YarnConfiguration())
+    val url: URL = new URL(appMasterPath)
+    val connection: HttpURLConnection = connectionFactory.openConnection(url)
+      .asInstanceOf[HttpURLConnection]
+    connection.setInstanceFollowRedirects(true)
+
+    try {
+      connection.connect()
+    } catch {
+      case e: IOException =>
+        LOG.error(s"Failed to connect to AppMaster" + e.getMessage)
+    }
+
+    val status = connection.getResponseCode
+    if (status == 200) {
+      val stream: java.io.InputStream = connection.getInputStream
+      val response = IOUtils.toString(stream, StandardCharsets.UTF_8)
+      LOG.info("Successfully resolved AppMaster address: " + response)
+      connection.disconnect()
+      AkkaHelper.actorFor(system, response)
+    } else {
+      connection.disconnect()
+      throw new IOException("Fail to resolve AppMaster address, please make sure " +
+        s"${report.getTrackingURL} is accessible...")
+    }
+  }
+}