Use akka-http for requests from the invoker to the containers

This commit also introduces a new helper object, `NewHttpUtils` to make non-pooled, possibly retrying, HTTP requests using akka-http data representation.
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/Container.scala b/core/dispatcher/src/main/scala/whisk/core/container/Container.scala
index 4587fcb..6516c7b 100644
--- a/core/dispatcher/src/main/scala/whisk/core/container/Container.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/container/Container.scala
@@ -46,12 +46,12 @@
     val name = containerName.getOrElse("anon")
     val dockerhost = pool.dockerhost
 
-    val (containerId, containerIP) = bringup(containerName, image, network, env, args, limits, policy)
+    val (containerId, containerHostAndPort) = bringup(containerName, image, network, env, args, limits, policy)
 
     def details: String = {
         val name = containerName getOrElse "??"
         val id = containerId getOrElse "??"
-        val ip = containerIP getOrElse "??"
+        val ip = containerHostAndPort getOrElse "??"
         s"container [$name] [$id] [$ip]"
     }
 
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/ContainerUtils.scala b/core/dispatcher/src/main/scala/whisk/core/container/ContainerUtils.scala
index 8745ffc..6f852b1 100644
--- a/core/dispatcher/src/main/scala/whisk/core/container/ContainerUtils.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/container/ContainerUtils.scala
@@ -51,7 +51,7 @@
      */
     def bringup(name: Option[String], image: String, network: String, env: Map[String, String], args: Array[String], limits: ActionLimits, policy: Option[String])(implicit transid: TransactionId): (ContainerId, ContainerIP) = {
         val id = makeContainer(name, image, network, env, args, limits, policy)
-        val host = if (id.isDefined) getContainerHost(name) else None
+        val host = id.flatMap(_ => getContainerHostAndPort(name))
         (id, host)
     }
 
@@ -155,7 +155,7 @@
 
     }
 
-    def getContainerHost(container: ContainerName)(implicit transid: TransactionId): ContainerIP = {
+    def getContainerHostAndPort(container: ContainerName)(implicit transid: TransactionId): ContainerIP = {
         container map { name =>
             runDockerCmd("inspect", "--format", "'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}'", name) map {
                 output => appendPort(output.substring(1, output.length - 1))
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/WhiskContainer.scala b/core/dispatcher/src/main/scala/whisk/core/container/WhiskContainer.scala
index 05225a8..aab28f4 100644
--- a/core/dispatcher/src/main/scala/whisk/core/container/WhiskContainer.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/container/WhiskContainer.scala
@@ -22,16 +22,28 @@
 
 import scala.concurrent.duration.DurationInt
 import scala.language.postfixOps
-import scala.util.Try
+
+import scala.concurrent.Await
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.marshalling._
+import akka.http.scaladsl.unmarshalling._
+import akka.stream.ActorMaterializer
 
 import spray.json.JsObject
 import spray.json.JsString
-import whisk.common.HttpUtils
-import whisk.common.LoggingMarkers
-import whisk.common.PrintStreamEmitter
 import whisk.common.TransactionId
 import whisk.core.entity.ActionLimits
-import whisk.core.entity.ActivationResponse
+import whisk.common.LoggingMarkers
+import whisk.common.PrintStreamEmitter
+import whisk.common.NewHttpUtils
 
 /**
  * Reifies a whisk container - one that respects the whisk container API.
@@ -79,13 +91,13 @@
     /**
      * Sends initialization payload to container.
      */
-    def init(args: JsObject)(implicit transid: TransactionId): RunResult = {
+    def init(args: JsObject)(implicit system: ActorSystem, transid: TransactionId): RunResult = {
         // this shouldn't be needed but leave it for now
         if (isBlackbox) Thread.sleep(3000)
         info(this, s"sending initialization to ${this.details}")
         // when invoking /init, don't wait longer than the timeout configured for this action
         val timeout = initTimeoutMilli min limits.timeout.duration
-        val result = sendPayload("/init", JsObject("value" -> args), timeout.toMillis.toInt) // This will retry.
+        val result = sendPayload("/init", JsObject("value" -> args), timeout) // This will retry.
         info(this, s"initialization result: ${result}")
         result
     }
@@ -96,7 +108,7 @@
      * @param state the value of the status to compare the actual state against
      * @return triple of start time, end time, response for user action.
      */
-    def run(args: JsObject, meta: JsObject, authKey: String, timeout: Int, actionName: String, activationId: String)(implicit transid: TransactionId): RunResult = {
+    def run(args: JsObject, meta: JsObject, authKey: String, timeout: FiniteDuration, actionName: String, activationId: String)(implicit system: ActorSystem, transid: TransactionId): RunResult = {
         val startMarker = transid.started("Invoker", LoggingMarkers.INVOKER_ACTIVATION_RUN, s"sending arguments to $actionName $details")
         val result = sendPayload("/run", JsObject(meta.fields + ("value" -> args) + ("authKey" -> JsString(authKey))), timeout)
         // Use start and end time of the activation
@@ -108,10 +120,10 @@
     /**
      * An alternative entry point for direct testing of action container.
      */
-    def run(payload: String, activationId: String): RunResult = {
+    def run(payload: String, activationId: String)(implicit system: ActorSystem): RunResult = {
         val params = JsObject("payload" -> JsString(payload))
         val meta = JsObject("activationId" -> JsString(activationId))
-        run(params, meta, "no_auth_key", 30000, "no_action", "no_activation_id")(TransactionId.testing)
+        run(params, meta, "no_auth_key", 30000.milliseconds, "no_action", "no_activation_id")(system, TransactionId.testing)
     }
 
     /**
@@ -127,27 +139,59 @@
      * @param msg the message to post
      * @return response from container if any as array of byte
      */
-    private def sendPayload(endpoint: String, msg: JsObject, timeout: Int): RunResult = {
+    private def sendPayload(endpoint: String, msg: JsObject, timeout: FiniteDuration)(implicit system: ActorSystem): RunResult = {
+        import system.dispatcher
+
         val start = ContainerCounter.now()
-        val result = containerIP map { host =>
-            try {
-                val connection = HttpUtils.makeHttpClient(timeout, true)
-                val http = new HttpUtils(connection, host)
-                val (code, bytes) = http.dopost(endpoint, msg, Map(), timeout)
-                Try { connection.close() }
-                val returnCode = if (code == -1) ActivationResponse.ContainerError else code
-                Some(returnCode, new String(bytes, "UTF-8"))
-            } catch {
-                case t: Throwable => {
-                    warn(this, s"Exception while posting to action container ${t.getMessage}")
-                    None
-                }
-            }
-        } getOrElse None
+
+        val f = sendPayloadAsync(endpoint, msg, timeout)
+
+        f.onFailure {
+            case t: Throwable =>
+                warn(this, s"Exception while posting to action container ${t.getMessage}")
+        }
+
+        Await.ready(f, Duration.Inf) // OK, since there is a built-in timeout.
+
         val end = ContainerCounter.now()
-        (start, end, result)
+
+        val r = f.value.get.toOption.flatten
+        (start, end, r)
     }
 
+    /**
+     * Asynchronously posts a message to the container.
+     *
+     *  @param msg the message to post
+     *  @return response from the container if any
+     */
+    private def sendPayloadAsync(endpoint: String, msg: JsObject, timeout: FiniteDuration)(implicit system: ActorSystem): Future[Option[(Int, String)]] = {
+        implicit val ec = system.dispatcher
+        implicit val materializer = ActorMaterializer()
+
+        containerHostAndPort map { hp =>
+            val host = hp.split(":")(0)
+            val port = hp.split(":")(1).toInt
+
+            val flow = Http().outgoingConnection(host, port)
+
+            val uri = Uri(
+                scheme = "http",
+                authority = Uri.Authority(host = Uri.Host(host), port = port),
+                path = Uri.Path(endpoint))
+
+            for (
+                entity <- Marshal(msg).to[MessageEntity];
+                request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity);
+                response <- NewHttpUtils.singleRequest(request, timeout, retryOnTCPErrors = true, retryInterval = 100.milliseconds);
+                responseBody <- Unmarshal(response.entity).to[String]
+            ) yield {
+                Some((response.status.intValue, responseBody))
+            }
+        } getOrElse {
+            Future.successful(None)
+        }
+    }
 }
 
 /**
diff --git a/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala b/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
index 6909725..9eb59bd 100644
--- a/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -270,15 +270,15 @@
         pool.getAction(action, auth) match {
             case Some((con, initResultOpt)) => Future {
                 val params = con.mergeParams(payload)
-                val timeoutMillis = action.limits.timeout.millis
+                val timeout = action.limits.timeout.duration
                 initResultOpt match {
-                    case None => (false, con.run(params, msg.meta, auth.compact, timeoutMillis,
+                    case None => (false, con.run(params, msg.meta, auth.compact, timeout,
                         action.fullyQualifiedName, msg.activationId.toString)) // cached
                     case Some((start, end, Some((200, _)))) => { // successful init
                         // TODO:  @perryibm update comment if this is still necessary else remove
                         Thread.sleep(if (con.isBlackbox) BlackBoxSlack else RegularSlack)
                         tran.initInterval = Some(start, end)
-                        (false, con.run(params, msg.meta, auth.compact, timeoutMillis,
+                        (false, con.run(params, msg.meta, auth.compact, timeout,
                             action.fullyQualifiedName, msg.activationId.toString))
                     }
                     case _ => (true, initResultOpt.get) //  unsuccessful initialization