blob: c70caee170504968377095e02cbaf4f9302e0b69 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.openwhisk.core.controller
import java.time.{Clock, Instant}
import scala.collection.immutable.Map
import scala.concurrent.Future
import scala.util.Try
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.HttpMethods.POST
import akka.http.scaladsl.model.StatusCodes.{Accepted, BadRequest, InternalServerError, NoContent, OK, ServerError}
import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model.headers.Authorization
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.{RequestContext, RouteResult}
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
import spray.json.DefaultJsonProtocol._
import pureconfig._
import pureconfig.generic.auto._
import spray.json._
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common.{Https, TransactionId}
import org.apache.openwhisk.core.controller.RestApiCommons.{ListLimit, ListSkip}
import org.apache.openwhisk.core.database.{ActivationStore, CacheChangeNotification}
import org.apache.openwhisk.core.entitlement.Collection
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.types.EntityStore
import org.apache.openwhisk.http.ErrorResponse
import org.apache.openwhisk.http.Messages
import org.apache.openwhisk.core.database.UserContext
/** A trait implementing the triggers API. */
trait WhiskTriggersApi extends WhiskCollectionAPI {
services: WhiskServices =>
protected override val collection = Collection(Collection.TRIGGERS)
/** An actor system for timed based futures. */
protected implicit val actorSystem: ActorSystem
/** Database service to CRUD triggers. */
protected val entityStore: EntityStore
/** Connection context for HTTPS */
protected lazy val httpsConnectionContext = {
val httpsConfig = loadConfigOrThrow[HttpsConfig]("whisk.controller.https")
Https.connectionContextClient(httpsConfig, true)
}
protected val controllerProtocol = loadConfigOrThrow[String]("whisk.controller.protocol")
/**
* Sends a request either over http or https depending on the configuration
* @param request http request to send
* @return http response packed in a future
*/
private def singleRequest(request: HttpRequest): Future[HttpResponse] = {
if (controllerProtocol == "https")
Http().singleRequest(request, connectionContext = httpsConnectionContext)
else
Http().singleRequest(request)
}
/** Notification service for cache invalidation. */
protected implicit val cacheChangeNotification: Some[CacheChangeNotification]
/** Database service to get activations. */
protected val activationStore: ActivationStore
/** JSON response formatter. */
/** Path to Triggers REST API. */
protected val triggersPath = "triggers"
protected val url = Uri(s"${controllerProtocol}://localhost:${whiskConfig.servicePort}")
import RestApiCommons.emptyEntityToJsObject
/**
* Creates or updates trigger if it already exists. The PUT content is deserialized into a WhiskTriggerPut
* which is a subset of WhiskTrigger (it eschews the namespace and entity name since the former is derived
* from the authenticated user and the latter is derived from the URI). The WhiskTriggerPut is merged with
* the existing WhiskTrigger in the datastore, overriding old values with new values that are defined.
* Any values not defined in the PUT content are replaced with old values.
*
* Responses are one of (Code, Message)
* - 200 WhiskAction as JSON
* - 400 Bad Request
* - 409 Conflict
* - 500 Internal Server Error
*/
override def create(user: Identity, entityName: FullyQualifiedEntityName)(implicit transid: TransactionId) = {
parameter('overwrite ? false) { overwrite =>
entity(as[WhiskTriggerPut]) { content =>
putEntity(WhiskTrigger, entityStore, entityName.toDocId, overwrite, update(content) _, () => {
create(content, entityName)
}, postProcess = Some { trigger =>
completeAsTriggerResponse(trigger)
})
}
}
}
/**
* Fires trigger if it exists. The POST content is deserialized into a Payload and posted
* to the loadbalancer.
*
* Responses are one of (Code, Message)
* - 200 ActivationId as JSON
* - 404 Not Found
* - 500 Internal Server Error
*/
override def activate(user: Identity, entityName: FullyQualifiedEntityName, env: Option[Parameters])(
implicit transid: TransactionId) = {
extractRequest { request =>
val context = UserContext(user, request)
entity(as[Option[JsObject]]) { payload =>
getEntity(WhiskTrigger.get(entityStore, entityName.toDocId), Some {
trigger: WhiskTrigger =>
// List of active rules associated with the trigger
val activeRules: Map[FullyQualifiedEntityName, ReducedRule] =
trigger.rules.map(_.filter(_._2.status == Status.ACTIVE)).getOrElse(Map.empty)
if (activeRules.nonEmpty) {
val triggerActivationId = activationIdFactory.make()
logging.info(this, s"[POST] trigger activation id: ${triggerActivationId}")
val triggerActivation = WhiskActivation(
namespace = user.namespace.name.toPath, // all activations should end up in the one space regardless trigger.namespace
entityName.name,
user.subject,
triggerActivationId,
Instant.now(Clock.systemUTC()),
Instant.EPOCH,
response = ActivationResponse.success(payload orElse Some(JsObject.empty)),
version = trigger.version,
duration = None)
val args: JsObject = trigger.parameters.merge(payload).getOrElse(JsObject.empty)
activateRules(user, args, trigger.rules.getOrElse(Map.empty))
.map(results => triggerActivation.withLogs(ActivationLogs(results.map(_.toJson.compactPrint).toVector)))
.recover {
case e =>
logging.error(this, s"Failed to write action activation results to trigger activation: $e")
triggerActivation
}
.map { activation =>
activationStore.storeAfterCheck(activation, false, None, context)
}
respondWithActivationIdHeader(triggerActivationId) {
complete(Accepted, triggerActivationId.toJsObject)
}
} else {
logging
.debug(
this,
s"[POST] trigger without an active rule was activated; no trigger activation record created for $entityName")
complete(NoContent)
}
})
}
}
}
/**
* Deletes trigger.
*
* Responses are one of (Code, Message)
* - 200 WhiskTrigger as JSON
* - 404 Not Found
* - 409 Conflict
* - 500 Internal Server Error
*/
override def remove(user: Identity, entityName: FullyQualifiedEntityName)(implicit transid: TransactionId) = {
deleteEntity(
WhiskTrigger,
entityStore,
entityName.toDocId,
(t: WhiskTrigger) => Future.successful({}),
postProcess = Some { trigger =>
completeAsTriggerResponse(trigger)
})
}
/**
* Gets trigger. The trigger name is prefixed with the namespace to create the primary index key.
*
* Responses are one of (Code, Message)
* - 200 WhiskTrigger has JSON
* - 404 Not Found
* - 500 Internal Server Error
*/
override def fetch(user: Identity, entityName: FullyQualifiedEntityName, env: Option[Parameters])(
implicit transid: TransactionId) = {
getEntity(WhiskTrigger.get(entityStore, entityName.toDocId), Some { trigger =>
completeAsTriggerResponse(trigger)
})
}
/**
* Gets all triggers in namespace.
*
* Responses are one of (Code, Message)
* - 200 [] or [WhiskTrigger as JSON]
* - 500 Internal Server Error
*/
override def list(user: Identity, namespace: EntityPath)(implicit transid: TransactionId) = {
parameter(
'skip.as[ListSkip] ? ListSkip(collection.defaultListSkip),
'limit.as[ListLimit] ? ListLimit(collection.defaultListLimit),
'count ? false) { (skip, limit, count) =>
if (!count) {
listEntities {
WhiskTrigger.listCollectionInNamespace(entityStore, namespace, skip.n, limit.n, includeDocs = false) map {
list =>
list.fold((js) => js, (ts) => ts.map(WhiskTrigger.serdes.write(_)))
}
}
} else {
countEntities {
WhiskTrigger.countCollectionInNamespace(entityStore, namespace, skip.n)
}
}
}
}
/** Creates a WhiskTrigger from PUT content, generating default values where necessary. */
private def create(content: WhiskTriggerPut, triggerName: FullyQualifiedEntityName)(
implicit transid: TransactionId): Future[WhiskTrigger] = {
val newTrigger = WhiskTrigger(
triggerName.path,
triggerName.name,
content.parameters getOrElse Parameters(),
content.limits getOrElse TriggerLimits(),
content.version getOrElse SemVer(),
content.publish getOrElse false,
content.annotations getOrElse Parameters())
validateTriggerFeed(newTrigger)
}
/** Updates a WhiskTrigger from PUT content, merging old trigger where necessary. */
private def update(content: WhiskTriggerPut)(trigger: WhiskTrigger)(
implicit transid: TransactionId): Future[WhiskTrigger] = {
val newTrigger = WhiskTrigger(
trigger.namespace,
trigger.name,
content.parameters getOrElse trigger.parameters,
content.limits getOrElse trigger.limits,
content.version getOrElse trigger.version.upPatch,
content.publish getOrElse trigger.publish,
content.annotations getOrElse trigger.annotations,
trigger.rules).revision[WhiskTrigger](trigger.docinfo.rev)
Future.successful(newTrigger)
}
/**
* Validates a trigger feed annotation.
* A trigger feed must be a valid entity name, e.g., one of 'namespace/package/name'
* or 'namespace/name', or just 'name'.
*
* TODO: check if the feed actually exists. This is deferred because the macro
* operation of creating a trigger and initializing the feed is handled as one
* atomic operation in the CLI and the UI. At some point these may be promoted
* to a single atomic operation in the controller; at which point, validating
* the trigger feed should execute the action (verifies it is a valid name that
* the subject is entitled to) and iff that succeeds will the trigger be created
* or updated.
*/
private def validateTriggerFeed(trigger: WhiskTrigger)(implicit transid: TransactionId) = {
trigger.annotations.get(Parameters.Feed) map {
case JsString(f) if (EntityPath.validate(f)) =>
Future successful trigger
case _ =>
Future failed {
RejectRequest(BadRequest, "Feed name is not valid")
}
} getOrElse {
Future successful trigger
}
}
/**
* Completes an HTTP request with a WhiskRule including the computed Status
*
* @param rule the rule to send
* @param status the status to include in the response
*/
private def completeAsTriggerResponse(trigger: WhiskTrigger): RequestContext => Future[RouteResult] = {
complete(OK, trigger)
}
/**
* Iterates through each rule and invoking each active rule's mapped action.
*/
private def activateRules(user: Identity,
args: JsObject,
rulesToActivate: Map[FullyQualifiedEntityName, ReducedRule])(
implicit transid: TransactionId): Future[Iterable[RuleActivationResult]] = {
val ruleResults = rulesToActivate.map {
case (ruleName, rule) if (rule.status != Status.ACTIVE) =>
Future.successful {
RuleActivationResult(
ActivationResponse.ApplicationError,
ruleName,
rule.action,
Left(Messages.triggerWithInactiveRule(ruleName.asString, rule.action.asString)))
}
case (ruleName, rule) =>
// Invoke the action. Retain action results for inclusion in the trigger activation record
postActivation(user, rule, args)
.flatMap { response =>
response.status match {
case OK | Accepted =>
Unmarshal(response.entity).to[JsObject].map { activationResponse =>
val activationId = activationResponse.fields("activationId").convertTo[ActivationId]
logging.debug(this, s"trigger-fired action '${rule.action}' invoked with activation $activationId")
RuleActivationResult(ActivationResponse.Success, ruleName, rule.action, Right(activationId))
}
case code =>
Unmarshal(response.entity).to[String].map { error =>
val failureType = code match {
case _: ServerError => ActivationResponse.WhiskError // all 500s are to be considered whisk errors
case _ => ActivationResponse.ApplicationError
}
val errorMessage: String = Try(error.parseJson.convertTo[ErrorResponse])
.map { e =>
def logMsg = s"trigger-fired action '${rule.action}' failed to invoke with ${e.error}, ${e.code}"
if (failureType == ActivationResponse.ApplicationError) logging.debug(this, logMsg)
else logging.error(this, logMsg)
e.error
}
.getOrElse {
logging
.error(this, s"trigger-fired action '${rule.action}' failed to invoke with status code $code")
InternalServerError.defaultMessage
}
RuleActivationResult(failureType, ruleName, rule.action, Left(errorMessage))
}
}
}
.recover {
case t =>
logging.error(this, s"trigger-fired action '${rule.action}' failed to invoke with $t")
RuleActivationResult(
ActivationResponse.WhiskError,
ruleName,
rule.action,
Left(InternalServerError.defaultMessage))
}
}
Future.sequence(ruleResults)
}
/**
* Posts an action activation. Currently done by posting internally to the controller.
* TODO: use a proper path that does not route through HTTP.
*
* @param rule the name of the rule that is activated
* @param args the arguments to post to the action
* @return a future with the HTTP response from the action activation
*/
private def postActivation(user: Identity, rule: ReducedRule, args: JsObject)(
implicit transid: TransactionId): Future[HttpResponse] = {
// Build the url to invoke an action mapped to the rule
val actionUrl = baseControllerPath / rule.action.path.root.asString / "actions"
val actionPath = rule.action.path.relativePath
.map(pkg => Path / pkg.namespace / rule.action.name.asString)
.getOrElse(Path / rule.action.name.asString)
user.authkey.getCredentials
.map { creds =>
val request = HttpRequest(
method = POST,
uri = url.withPath(actionUrl ++ actionPath),
headers = List(Authorization(creds), transid.toHeader),
entity = HttpEntity(MediaTypes.`application/json`, args.compactPrint))
singleRequest(request)
}
.getOrElse(Future.failed(new NoCredentialsAvailable()))
}
/** Contains the result of invoking a rule */
case class RuleActivationResult(statusCode: Int,
ruleName: FullyQualifiedEntityName,
actionName: FullyQualifiedEntityName,
response: Either[String, ActivationId]) {
def toJson: JsObject =
JsObject(
Map(
"rule" -> ruleName.asString.toJson,
"action" -> actionName.asString.toJson,
"statusCode" -> statusCode.toJson,
"success" -> (statusCode == ActivationResponse.Success).toJson,
response.fold("error" -> _.toJson, "activationId" -> _.toJson)))
}
/** Common base bath for the controller, used by internal action activation mechanism. */
private val baseControllerPath = Path / "api" / "v1" / "namespaces"
/** Custom unmarshaller for query parameters "limit" for "list" operations. */
private implicit val stringToListLimit: Unmarshaller[String, ListLimit] = RestApiCommons.stringToListLimit(collection)
/** Custom unmarshaller for query parameters "skip" for "list" operations. */
private implicit val stringToListSkip: Unmarshaller[String, ListSkip] = RestApiCommons.stringToListSkip(collection)
private case class NoCredentialsAvailable() extends IllegalArgumentException
}