blob: b5b7776936d01659991c35e3657f8b09571a0b59 [file] [log] [blame]
/*
* Copyright 2015-2016 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package whisk.core.dispatcher
import scala.collection.concurrent.TrieMap
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.Failure
import scala.util.Success
import scala.util.matching.Regex.Match
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.actorRef2Scala
import whisk.common.Counter
import whisk.common.Logging
import whisk.common.TransactionId
import whisk.core.connector.{ ActivationMessage => Message }
import whisk.core.connector.MessageConsumer
import whisk.core.invoker.InvokerService
import akka.event.Logging.LogLevel
object Dispatcher extends Logging {
def main(args: Array[String]): Unit = {
val name = if (args.nonEmpty) args(0).trim.toLowerCase() else ""
name match {
case "invoker" => InvokerService.main(args)
case _ => error(Dispatcher, s"unrecognized app $name")
}
}
}
/**
* Creates a dispatcher that pulls messages from the message pub/sub connector.
* This is currently used by invoker only. It may be removed in the future and
* its functionality merged directly with the invoker. The current model allows
* for different message types to be received by more than one consumer in the
* same process (via handler registration).
*
* @param verbosity level for logging
* @param consumer the consumer providing messages
* @param pollDuration the long poll duration (max duration to wait for new messages)
* @param maxPipelineDepth the maximum number of messages allowed in the queued (even >=2)
* @param actorSystem an actor system to create actor
*/
@throws[IllegalArgumentException]
class Dispatcher(
verbosity: LogLevel,
consumer: MessageConsumer,
pollDuration: FiniteDuration,
maxPipelineDepth: Int,
actorSystem: ActorSystem)
extends Registrar
with Logging {
setVerbosity(verbosity)
val activationFeed = actorSystem.actorOf(Props(new ActivationFeed(this: Logging, consumer, maxPipelineDepth, pollDuration, process)))
def start() = activationFeed ! ActivationFeed.FillQueueWithMessages
def stop() = consumer.close()
/**
* Consumes messages from the bus using a streaming consumer
* interface. Each message is a JSON object with at least these properties:
* { path: the topic name,
* payload: the message body }
*
* Expected topics are "/whisk/invoke[0..n-1]" (handled by Invoker).
* Expected paths "actions/invoke" (handled by Invoker).
*
* The paths should generally mirror the REST API.
*
* For every message that is received, this method extracts the path property
* from the message and checks if there are registered handlers for the message.
* A handler is registered via addHandler and unregistered via removeHandler.
* All matches are checked in parallel, and messages are dispatched to all matching
* handlers. The handling of a message is wrapped in a Future. A handler is skipped
* if it is not active.
*/
def process(topic: String, bytes: Array[Byte]) = {
val raw = new String(bytes, "utf-8")
Message(raw) match {
case Success(m) =>
implicit val tid = m.transid
if (m.path.nonEmpty) inform(handlers) foreach {
case (name, handler) =>
val matches = handler.matches(m.path)
handleMessage(handler, topic, m, matches)
}
case Failure(t) => info(this, errorMsg(raw, t))
}
}
private def handleMessage(rule: DispatchRule, topic: String, msg: Message, matches: Seq[Match]) = {
implicit val tid = msg.transid
implicit val executionContext = actorSystem.dispatcher
if (matches.nonEmpty) Future {
val count = counter.next()
debug(this, s"activeCount = $count while handling ${rule.name}")
rule.doit(topic, msg, matches) // returns a future which is flat-mapped to hang onComplete
} flatMap (identity) onComplete {
case Success(a) => debug(this, s"activeCount = ${counter.prev()} after handling $rule")
case Failure(t) => error(this, s"activeCount = ${counter.prev()} ${errorMsg(rule, t)}")
}
}
private def inform(matchers: TrieMap[String, DispatchRule])(implicit transid: TransactionId) = {
val names = matchers map { _._2.name } reduce (_ + "," + _)
debug(this, s"matching message to ${matchers.size} handlers: $names")
matchers
}
private def errorMsg(handler: DispatchRule, e: Throwable): String =
s"failed applying handler '${handler.name}': ${errorMsg(e)}"
private def errorMsg(msg: String, e: Throwable) =
s"failed processing message: $msg $e${e.getStackTrace.mkString("", " ", "")}"
private def errorMsg(e: Throwable): String = {
if (e.isInstanceOf[java.util.concurrent.ExecutionException]) {
s"$e${e.getCause.getStackTrace.mkString("", " ", "")}"
} else {
s"$e${e.getStackTrace.mkString("", " ", "")}"
}
}
private val counter = new Counter()
}
trait Registrar {
/**
* Adds handler for a message. The handler name must be unique, else
* the new handler replaces a previously added one unless this behavior
* is overridden by setting replace to false.
*
* @param handler is the message handler to add override
* @param replace indicates whether a new handler should replace an older handler by the same name
* @return an option dispatch rule, the previous value of the rule if any
*/
def addHandler(handler: DispatchRule, replace: Boolean): Option[DispatchRule] = {
if (handler != null && handler.isValid) {
if (replace) handlers.put(handler.name, handler)
else handlers.putIfAbsent(handler.name, handler)
} else None
}
/**
* Removes handlers by name if it exists.
*
* @param name is the name of the handler to remove
* @return the handler just removed if any
*/
def removeHandler(name: String): Option[DispatchRule] = {
if (name != null && name.trim.nonEmpty)
handlers.remove(name)
else None
}
/**
* Removes handler if it exists.
*
* @param handler is the message handler to remove
* @return the handler just removed if any
*/
def removeHandler(handler: DispatchRule): Option[DispatchRule] = {
if (handler != null && handler.isValid) {
handlers.remove(handler.name)
} else None
}
protected val handlers = new TrieMap[String, DispatchRule]
}