Use akka-http for requests from the invoker to the containers
This commit also introduces a new helper object, `NewHttpUtils` to make non-pooled, possibly retrying, HTTP requests using akka-http data representation.
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/Container.scala b/core/dispatcher/src/main/scala/whisk/core/container/Container.scala
index 4587fcb..6516c7b 100644
--- a/core/dispatcher/src/main/scala/whisk/core/container/Container.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/container/Container.scala
@@ -46,12 +46,12 @@
val name = containerName.getOrElse("anon")
val dockerhost = pool.dockerhost
- val (containerId, containerIP) = bringup(containerName, image, network, env, args, limits, policy)
+ val (containerId, containerHostAndPort) = bringup(containerName, image, network, env, args, limits, policy)
def details: String = {
val name = containerName getOrElse "??"
val id = containerId getOrElse "??"
- val ip = containerIP getOrElse "??"
+ val ip = containerHostAndPort getOrElse "??"
s"container [$name] [$id] [$ip]"
}
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/ContainerUtils.scala b/core/dispatcher/src/main/scala/whisk/core/container/ContainerUtils.scala
index 8745ffc..6f852b1 100644
--- a/core/dispatcher/src/main/scala/whisk/core/container/ContainerUtils.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/container/ContainerUtils.scala
@@ -51,7 +51,7 @@
*/
def bringup(name: Option[String], image: String, network: String, env: Map[String, String], args: Array[String], limits: ActionLimits, policy: Option[String])(implicit transid: TransactionId): (ContainerId, ContainerIP) = {
val id = makeContainer(name, image, network, env, args, limits, policy)
- val host = if (id.isDefined) getContainerHost(name) else None
+ val host = id.flatMap(_ => getContainerHostAndPort(name))
(id, host)
}
@@ -155,7 +155,7 @@
}
- def getContainerHost(container: ContainerName)(implicit transid: TransactionId): ContainerIP = {
+ def getContainerHostAndPort(container: ContainerName)(implicit transid: TransactionId): ContainerIP = {
container map { name =>
runDockerCmd("inspect", "--format", "'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}'", name) map {
output => appendPort(output.substring(1, output.length - 1))
diff --git a/core/dispatcher/src/main/scala/whisk/core/container/WhiskContainer.scala b/core/dispatcher/src/main/scala/whisk/core/container/WhiskContainer.scala
index 05225a8..aab28f4 100644
--- a/core/dispatcher/src/main/scala/whisk/core/container/WhiskContainer.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/container/WhiskContainer.scala
@@ -22,16 +22,28 @@
import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
-import scala.util.Try
+
+import scala.concurrent.Await
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.marshalling._
+import akka.http.scaladsl.unmarshalling._
+import akka.stream.ActorMaterializer
import spray.json.JsObject
import spray.json.JsString
-import whisk.common.HttpUtils
-import whisk.common.LoggingMarkers
-import whisk.common.PrintStreamEmitter
import whisk.common.TransactionId
import whisk.core.entity.ActionLimits
-import whisk.core.entity.ActivationResponse
+import whisk.common.LoggingMarkers
+import whisk.common.PrintStreamEmitter
+import whisk.common.NewHttpUtils
/**
* Reifies a whisk container - one that respects the whisk container API.
@@ -79,13 +91,13 @@
/**
* Sends initialization payload to container.
*/
- def init(args: JsObject)(implicit transid: TransactionId): RunResult = {
+ def init(args: JsObject)(implicit system: ActorSystem, transid: TransactionId): RunResult = {
// this shouldn't be needed but leave it for now
if (isBlackbox) Thread.sleep(3000)
info(this, s"sending initialization to ${this.details}")
// when invoking /init, don't wait longer than the timeout configured for this action
val timeout = initTimeoutMilli min limits.timeout.duration
- val result = sendPayload("/init", JsObject("value" -> args), timeout.toMillis.toInt) // This will retry.
+ val result = sendPayload("/init", JsObject("value" -> args), timeout) // This will retry.
info(this, s"initialization result: ${result}")
result
}
@@ -96,7 +108,7 @@
* @param state the value of the status to compare the actual state against
* @return triple of start time, end time, response for user action.
*/
- def run(args: JsObject, meta: JsObject, authKey: String, timeout: Int, actionName: String, activationId: String)(implicit transid: TransactionId): RunResult = {
+ def run(args: JsObject, meta: JsObject, authKey: String, timeout: FiniteDuration, actionName: String, activationId: String)(implicit system: ActorSystem, transid: TransactionId): RunResult = {
val startMarker = transid.started("Invoker", LoggingMarkers.INVOKER_ACTIVATION_RUN, s"sending arguments to $actionName $details")
val result = sendPayload("/run", JsObject(meta.fields + ("value" -> args) + ("authKey" -> JsString(authKey))), timeout)
// Use start and end time of the activation
@@ -108,10 +120,10 @@
/**
* An alternative entry point for direct testing of action container.
*/
- def run(payload: String, activationId: String): RunResult = {
+ def run(payload: String, activationId: String)(implicit system: ActorSystem): RunResult = {
val params = JsObject("payload" -> JsString(payload))
val meta = JsObject("activationId" -> JsString(activationId))
- run(params, meta, "no_auth_key", 30000, "no_action", "no_activation_id")(TransactionId.testing)
+ run(params, meta, "no_auth_key", 30000.milliseconds, "no_action", "no_activation_id")(system, TransactionId.testing)
}
/**
@@ -127,27 +139,59 @@
* @param msg the message to post
* @return response from container if any as array of byte
*/
- private def sendPayload(endpoint: String, msg: JsObject, timeout: Int): RunResult = {
+ private def sendPayload(endpoint: String, msg: JsObject, timeout: FiniteDuration)(implicit system: ActorSystem): RunResult = {
+ import system.dispatcher
+
val start = ContainerCounter.now()
- val result = containerIP map { host =>
- try {
- val connection = HttpUtils.makeHttpClient(timeout, true)
- val http = new HttpUtils(connection, host)
- val (code, bytes) = http.dopost(endpoint, msg, Map(), timeout)
- Try { connection.close() }
- val returnCode = if (code == -1) ActivationResponse.ContainerError else code
- Some(returnCode, new String(bytes, "UTF-8"))
- } catch {
- case t: Throwable => {
- warn(this, s"Exception while posting to action container ${t.getMessage}")
- None
- }
- }
- } getOrElse None
+
+ val f = sendPayloadAsync(endpoint, msg, timeout)
+
+ f.onFailure {
+ case t: Throwable =>
+ warn(this, s"Exception while posting to action container ${t.getMessage}")
+ }
+
+ Await.ready(f, Duration.Inf) // OK, since there is a built-in timeout.
+
val end = ContainerCounter.now()
- (start, end, result)
+
+ val r = f.value.get.toOption.flatten
+ (start, end, r)
}
+ /**
+ * Asynchronously posts a message to the container.
+ *
+ * @param msg the message to post
+ * @return response from the container if any
+ */
+ private def sendPayloadAsync(endpoint: String, msg: JsObject, timeout: FiniteDuration)(implicit system: ActorSystem): Future[Option[(Int, String)]] = {
+ implicit val ec = system.dispatcher
+ implicit val materializer = ActorMaterializer()
+
+ containerHostAndPort map { hp =>
+ val host = hp.split(":")(0)
+ val port = hp.split(":")(1).toInt
+
+ val flow = Http().outgoingConnection(host, port)
+
+ val uri = Uri(
+ scheme = "http",
+ authority = Uri.Authority(host = Uri.Host(host), port = port),
+ path = Uri.Path(endpoint))
+
+ for (
+ entity <- Marshal(msg).to[MessageEntity];
+ request = HttpRequest(method = HttpMethods.POST, uri = uri, entity = entity);
+ response <- NewHttpUtils.singleRequest(request, timeout, retryOnTCPErrors = true, retryInterval = 100.milliseconds);
+ responseBody <- Unmarshal(response.entity).to[String]
+ ) yield {
+ Some((response.status.intValue, responseBody))
+ }
+ } getOrElse {
+ Future.successful(None)
+ }
+ }
}
/**
diff --git a/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala b/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
index 6909725..9eb59bd 100644
--- a/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/dispatcher/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -270,15 +270,15 @@
pool.getAction(action, auth) match {
case Some((con, initResultOpt)) => Future {
val params = con.mergeParams(payload)
- val timeoutMillis = action.limits.timeout.millis
+ val timeout = action.limits.timeout.duration
initResultOpt match {
- case None => (false, con.run(params, msg.meta, auth.compact, timeoutMillis,
+ case None => (false, con.run(params, msg.meta, auth.compact, timeout,
action.fullyQualifiedName, msg.activationId.toString)) // cached
case Some((start, end, Some((200, _)))) => { // successful init
// TODO: @perryibm update comment if this is still necessary else remove
Thread.sleep(if (con.isBlackbox) BlackBoxSlack else RegularSlack)
tran.initInterval = Some(start, end)
- (false, con.run(params, msg.meta, auth.compact, timeoutMillis,
+ (false, con.run(params, msg.meta, auth.compact, timeout,
action.fullyQualifiedName, msg.activationId.toString))
}
case _ => (true, initResultOpt.get) // unsuccessful initialization