blob: fbd2d76cc356ea122fd208a9c47110649c8e008f [file] [log] [blame]
/*
* Copyright 2015-2016 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package whisk.test.http
import scala.concurrent.Await
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.FiniteDuration
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
import akka.stream.ActorMaterializer
import akka.stream.Materializer
import akka.stream.scaladsl._
import akka.pattern.ask
import akka.pattern.pipe
import akka.util.Timeout
object RESTProxy {
// Orders the proxy to immediately unbind and rebind after the duration has passed.
case class UnbindFor(duration: FiniteDuration)
}
/**
* A simple REST proxy that can receive commands to change its behavior (e.g.
* simulate failures of the proxied service). Not for use in production.
*/
class RESTProxy(val host: String, val port: Int)(val serviceAuthority: Uri.Authority, val useHTTPS: Boolean) extends Actor with ActorLogging {
private implicit val actorSystem = context.system
private implicit val executionContex = actorSystem.dispatcher
private val destHost = serviceAuthority.host.address
private val destPort = serviceAuthority.port
// These change as connections come and go
private var materializer: Option[ActorMaterializer] = None
private var binding: Option[Http.ServerBinding] = None
// Public messages
import RESTProxy._
// Internal messages
private case object DoBind
private case object DoUnbind
private case class Request(request: HttpRequest)
// Route requests through messages to this actor, to serialize w.r.t events such as unbinding
private def mkRequestFlow(materializer: Materializer) : Flow[HttpRequest,HttpResponse,_] = {
implicit val m = materializer
Flow.apply[HttpRequest].mapAsync(4) { request =>
ask(self, Request(request))(timeout = Timeout(1.minute)).mapTo[HttpResponse]
}
}
private def bind(checkState: Boolean = true) : Unit = {
assert(!checkState || binding.isEmpty, "Proxy is already bound")
if(binding.isEmpty) {
assert(materializer.isEmpty)
implicit val m = ActorMaterializer()
materializer = Some(m)
log.debug(s"[RESTProxy] Binding to '$host:$port'.")
val b = Await.result(Http().bindAndHandle(mkRequestFlow(m), host, port), 5.seconds)
binding = Some(b)
}
}
private def unbind(checkState: Boolean = true) : Unit = {
assert(!checkState || binding.isDefined, "Proxy is not bound")
binding.foreach { b =>
log.debug(s"[RESTProxy] Unbinding from '${b.localAddress}'")
Await.result(b.unbind(), 5.seconds)
binding = None
assert(materializer.isDefined)
materializer.foreach { m =>
materializer = None
m.shutdown()
}
}
}
override def preStart() = bind()
override def postStop() = unbind(checkState = false)
override def receive = {
case UnbindFor(d) =>
self ! DoUnbind
actorSystem.scheduler.scheduleOnce(d, self, DoBind)
case DoUnbind =>
unbind(checkState = false)
case DoBind =>
bind(checkState = false)
case Request(request) =>
// If the actor isn't bound to the port / has no materializer,
// the request is simply dropped.
materializer.map { implicit m =>
log.debug(s"[RESTProxy] Proxying '${request.uri}' to '${serviceAuthority}'")
val flow = if(useHTTPS) {
Http().outgoingConnectionHttps(destHost, destPort)
} else {
Http().outgoingConnection(destHost, destPort)
}
// akka-http doesn't like us to set those headers ourselves.
val upstreamRequest = request.copy(
headers = request.headers.filter(_ match {
case `Timeout-Access`(_) => false
case _ => true
})
)
Source.single(upstreamRequest)
.via(flow)
.runWith(Sink.head)
.pipeTo(sender)
}
}
}