| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.openwhisk.core.controller |
| |
| import scala.concurrent.Future |
| import scala.concurrent.duration._ |
| import scala.util.{Failure, Success, Try} |
| import org.apache.kafka.common.errors.RecordTooLargeException |
| import akka.actor.ActorSystem |
| import akka.http.scaladsl.model.StatusCodes._ |
| import akka.http.scaladsl.server.RequestContext |
| import akka.http.scaladsl.server.RouteResult |
| import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ |
| import akka.http.scaladsl.unmarshalling._ |
| import spray.json._ |
| import spray.json.DefaultJsonProtocol._ |
| import org.apache.openwhisk.common.TransactionId |
| import org.apache.openwhisk.core.{FeatureFlags, WhiskConfig} |
| import org.apache.openwhisk.core.controller.RestApiCommons.{ListLimit, ListSkip} |
| import org.apache.openwhisk.core.controller.actions.PostActionActivation |
| import org.apache.openwhisk.core.database.{ActivationStore, CacheChangeNotification, NoDocumentException} |
| import org.apache.openwhisk.core.entitlement._ |
| import org.apache.openwhisk.core.entity._ |
| import org.apache.openwhisk.core.entity.types.EntityStore |
| import org.apache.openwhisk.http.ErrorResponse.terminate |
| import org.apache.openwhisk.http.Messages |
| import org.apache.openwhisk.http.Messages._ |
| import org.apache.openwhisk.core.entitlement.Resource |
| import org.apache.openwhisk.core.entitlement.Collection |
| import org.apache.openwhisk.core.loadBalancer.LoadBalancerException |
| |
| /** |
| * A singleton object which defines the properties that must be present in a configuration |
| * in order to implement the actions API. |
| */ |
| object WhiskActionsApi { |
| def requiredProperties = Map(WhiskConfig.actionSequenceMaxLimit -> null) |
| |
| /** |
| * Amends annotations on an action create/update with system defined values. |
| * This method currently adds the following annotations: |
| * 1. [[Annotations.ProvideApiKeyAnnotationName]] with the value false iff the annotation is not already defined in the action annotations |
| * 2. An [[execAnnotation]] consistent with the action kind; this annotation is always added and overrides a pre-existing value |
| */ |
| protected[core] def amendAnnotations(annotations: Parameters, exec: Exec, create: Boolean = true): Parameters = { |
| val newAnnotations = if (create && FeatureFlags.requireApiKeyAnnotation) { |
| // these annotations are only added on newly created actions |
| // since they can break existing actions created before the |
| // annotation was created |
| annotations |
| .get(Annotations.ProvideApiKeyAnnotationName) |
| .map(_ => annotations) |
| .getOrElse { |
| annotations ++ Parameters(Annotations.ProvideApiKeyAnnotationName, JsFalse) |
| } |
| } else annotations |
| newAnnotations ++ execAnnotation(exec) |
| } |
| |
| /** |
| * Constructs an "exec" annotation. This is redundant with the exec kind |
| * information available in WhiskAction but necessary for some clients which |
| * fetch action lists but cannot determine action kinds without fetching them. |
| * An alternative is to include the exec in the action list "view" but this |
| * will require an API change. So using an annotation instead. |
| */ |
| private def execAnnotation(exec: Exec): Parameters = { |
| Parameters(WhiskAction.execFieldName, exec.kind) |
| } |
| } |
| |
| /** A trait implementing the actions API. */ |
| trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with ReferencedEntities { |
| services: WhiskServices => |
| |
| protected override val collection = Collection(Collection.ACTIONS) |
| |
| /** An actor system for timed based futures. */ |
| protected implicit val actorSystem: ActorSystem |
| |
| /** Database service to CRUD actions. */ |
| protected val entityStore: EntityStore |
| |
| /** Notification service for cache invalidation. */ |
| protected implicit val cacheChangeNotification: Some[CacheChangeNotification] |
| |
| /** Database service to get activations. */ |
| protected val activationStore: ActivationStore |
| |
| /** Entity normalizer to JSON object. */ |
| import RestApiCommons.emptyEntityToJsObject |
| |
| /** JSON response formatter. */ |
| import RestApiCommons.jsonDefaultResponsePrinter |
| |
| /** |
| * Handles operations on action resources, which encompass these cases: |
| * |
| * 1. ns/foo -> subject must be authorized for one of { action(ns, *), action(ns, foo) }, |
| * resource resolves to { action(ns, foo) } |
| * |
| * 2. ns/bar/foo -> where bar is a package |
| * subject must be authorized for one of { package(ns, *), package(ns, bar), action(ns.bar, foo) } |
| * resource resolves to { action(ns.bar, foo) } |
| * |
| * 3. ns/baz/foo -> where baz is a binding to ns'.bar |
| * subject must be authorized for one of { package(ns, *), package(ns, baz) } |
| * *and* one of { package(ns', *), package(ns', bar), action(ns'.bar, foo) } |
| * resource resolves to { action(ns'.bar, foo) } |
| * |
| * Note that package(ns, xyz) == action(ns.xyz, *) and if subject has rights to package(ns, xyz) |
| * then they also have rights to action(ns.xyz, *) since sharing is done at the package level and |
| * is not more granular; hence a check on action(ns.xyz, abc) is eschewed. |
| * |
| * Only list is supported for these resources: |
| * |
| * 4. ns/bar/ -> where bar is a package |
| * subject must be authorized for one of { package(ns, *), package(ns, bar) } |
| * resource resolves to { action(ns.bar, *) } |
| * |
| * 5. ns/baz/ -> where baz is a binding to ns'.bar |
| * subject must be authorized for one of { package(ns, *), package(ns, baz) } |
| * *and* one of { package(ns', *), package(ns', bar) } |
| * resource resolves to { action(ns.bar, *) } |
| */ |
| protected override def innerRoutes(user: Identity, ns: EntityPath)(implicit transid: TransactionId) = { |
| (entityPrefix & entityOps & requestMethod) { (segment, m) => |
| entityname(segment) { outername => |
| pathEnd { |
| // matched /namespace/collection/name |
| // this is an action in default package, authorize and dispatch |
| authorizeAndDispatch(m, user, Resource(ns, collection, Some(outername))) |
| } ~ (get & pathSingleSlash) { |
| // matched GET /namespace/collection/package-name/ |
| // list all actions in package iff subject is entitled to READ package |
| val resource = Resource(ns, Collection(Collection.PACKAGES), Some(outername)) |
| onComplete(entitlementProvider.check(user, Privilege.READ, resource)) { |
| case Success(_) => listPackageActions(user, FullyQualifiedEntityName(ns, EntityName(outername))) |
| case Failure(f) => super.handleEntitlementFailure(f) |
| } |
| } ~ (entityPrefix & pathEnd) { segment => |
| entityname(segment) { innername => |
| // matched /namespace/collection/package-name/action-name |
| // this is an action in a named package |
| val packageDocId = FullyQualifiedEntityName(ns, EntityName(outername)).toDocId |
| val packageResource = Resource(ns.addPath(EntityName(outername)), collection, Some(innername)) |
| |
| val right = collection.determineRight(m, Some(innername)) |
| onComplete(entitlementProvider.check(user, right, packageResource)) { |
| case Success(_) => |
| getEntity(WhiskPackage.get(entityStore, packageDocId), Some { |
| if (right == Privilege.READ || right == Privilege.ACTIVATE) { wp: WhiskPackage => |
| val actionResource = Resource(wp.fullPath, collection, Some(innername)) |
| dispatchOp(user, right, actionResource) |
| |
| } else { |
| // these packaged action operations do not need merging with the package, |
| // but may not be permitted if this is a binding, or if the subject does |
| // not have PUT and DELETE rights to the package itself |
| (wp: WhiskPackage) => |
| wp.binding map { _ => |
| terminate(BadRequest, Messages.notAllowedOnBinding) |
| } getOrElse { |
| val actionResource = Resource(wp.fullPath, collection, Some(innername)) |
| dispatchOp(user, right, actionResource) |
| } |
| } |
| }) |
| case Failure(f) => super.handleEntitlementFailure(f) |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Creates or updates action if it already exists. The PUT content is deserialized into a WhiskActionPut |
| * which is a subset of WhiskAction (it eschews the namespace and entity name since the former is derived |
| * from the authenticated user and the latter is derived from the URI). The WhiskActionPut is merged with |
| * the existing WhiskAction in the datastore, overriding old values with new values that are defined. |
| * Any values not defined in the PUT content are replaced with old values. |
| * |
| * Responses are one of (Code, Message) |
| * - 200 WhiskAction as JSON |
| * - 400 Bad Request |
| * - 409 Conflict |
| * - 500 Internal Server Error |
| */ |
| override def create(user: Identity, entityName: FullyQualifiedEntityName)(implicit transid: TransactionId) = { |
| parameter('overwrite ? false) { overwrite => |
| entity(as[WhiskActionPut]) { content => |
| val request = content.resolve(user.namespace) |
| val checkAdditionalPrivileges = entitleReferencedEntities(user, Privilege.READ, request.exec).flatMap { |
| case _ => entitlementProvider.check(user, content.exec) |
| } |
| |
| onComplete(checkAdditionalPrivileges) { |
| case Success(_) => |
| putEntity(WhiskAction, entityStore, entityName.toDocId, overwrite, update(user, request) _, () => { |
| make(user, entityName, request) |
| }) |
| case Failure(f) => |
| super.handleEntitlementFailure(f) |
| } |
| } |
| } |
| } |
| |
| /** |
| * Invokes action if it exists. The POST content is deserialized into a Payload and posted |
| * to the loadbalancer. |
| * |
| * Responses are one of (Code, Message) |
| * - 200 Activation as JSON if blocking or just the result JSON iff '&result=true' |
| * - 202 ActivationId as JSON (this is issued on non-blocking activation or blocking activation that times out) |
| * - 404 Not Found |
| * - 502 Bad Gateway |
| * - 500 Internal Server Error |
| */ |
| override def activate(user: Identity, entityName: FullyQualifiedEntityName, env: Option[Parameters])( |
| implicit transid: TransactionId) = { |
| parameter( |
| 'blocking ? false, |
| 'result ? false, |
| 'timeout.as[FiniteDuration] ? controllerActivationConfig.maxWaitForBlockingActivation) { |
| (blocking, result, waitOverride) => |
| entity(as[Option[JsObject]]) { payload => |
| getEntity(WhiskActionMetaData.resolveActionAndMergeParameters(entityStore, entityName), Some { |
| act: WhiskActionMetaData => |
| // resolve the action --- special case for sequences that may contain components with '_' as default package |
| val action = act.resolve(user.namespace) |
| onComplete(entitleReferencedEntitiesMetaData(user, Privilege.ACTIVATE, Some(action.exec))) { |
| case Success(_) => |
| val actionWithMergedParams = env.map(action.inherit(_)) getOrElse action |
| |
| // incoming parameters may not override final parameters (i.e., parameters with already defined values) |
| // on an action once its parameters are resolved across package and binding |
| val allowInvoke = payload |
| .map(_.fields.keySet.forall(key => !actionWithMergedParams.immutableParameters.contains(key))) |
| .getOrElse(true) |
| |
| if (allowInvoke) { |
| doInvoke(user, actionWithMergedParams, payload, blocking, waitOverride, result) |
| } else { |
| terminate(BadRequest, Messages.parametersNotAllowed) |
| } |
| |
| case Failure(f) => |
| super.handleEntitlementFailure(f) |
| } |
| }) |
| } |
| } |
| } |
| |
| private def doInvoke(user: Identity, |
| actionWithMergedParams: WhiskActionMetaData, |
| payload: Option[JsObject], |
| blocking: Boolean, |
| waitOverride: FiniteDuration, |
| result: Boolean)(implicit transid: TransactionId): RequestContext => Future[RouteResult] = { |
| val waitForResponse = if (blocking) Some(waitOverride) else None |
| onComplete(invokeAction(user, actionWithMergedParams, payload, waitForResponse, cause = None)) { |
| case Success(Left(activationId)) => |
| // non-blocking invoke or blocking invoke which got queued instead |
| respondWithActivationIdHeader(activationId) { |
| complete(Accepted, activationId.toJsObject) |
| } |
| case Success(Right(activation)) => |
| val response = if (result) activation.resultAsJson else activation.toExtendedJson() |
| |
| respondWithActivationIdHeader(activation.activationId) { |
| if (activation.response.isSuccess) { |
| complete(OK, response) |
| } else if (activation.response.isApplicationError) { |
| // actions that result is ApplicationError status are considered a 'success' |
| // and will have an 'error' property in the result - the HTTP status is OK |
| // and clients must check the response status if it exists |
| // NOTE: response status will not exist in the JSON object if ?result == true |
| // and instead clients must check if 'error' is in the JSON |
| // PRESERVING OLD BEHAVIOR and will address defect in separate change |
| complete(BadGateway, response) |
| } else if (activation.response.isContainerError) { |
| complete(BadGateway, response) |
| } else { |
| complete(InternalServerError, response) |
| } |
| } |
| case Failure(t: RecordTooLargeException) => |
| logging.debug(this, s"[POST] action payload was too large") |
| terminate(PayloadTooLarge) |
| case Failure(RejectRequest(code, message)) => |
| logging.debug(this, s"[POST] action rejected with code $code: $message") |
| terminate(code, message) |
| case Failure(t: LoadBalancerException) => |
| logging.error(this, s"[POST] failed in loadbalancer: ${t.getMessage}") |
| terminate(ServiceUnavailable) |
| case Failure(t: Throwable) => |
| logging.error(this, s"[POST] action activation failed: ${t.getMessage}") |
| terminate(InternalServerError) |
| } |
| } |
| |
| /** |
| * Deletes action. |
| * |
| * Responses are one of (Code, Message) |
| * - 200 WhiskAction as JSON |
| * - 404 Not Found |
| * - 409 Conflict |
| * - 500 Internal Server Error |
| */ |
| override def remove(user: Identity, entityName: FullyQualifiedEntityName)(implicit transid: TransactionId) = { |
| deleteEntity(WhiskAction, entityStore, entityName.toDocId, (a: WhiskAction) => Future.successful({})) |
| } |
| |
| /** |
| * Gets action. The action name is prefixed with the namespace to create the primary index key. |
| * |
| * Responses are one of (Code, Message) |
| * - 200 WhiskAction has JSON |
| * - 404 Not Found |
| * - 500 Internal Server Error |
| */ |
| override def fetch(user: Identity, entityName: FullyQualifiedEntityName, env: Option[Parameters])( |
| implicit transid: TransactionId) = { |
| parameter('code ? true) { code => |
| code match { |
| case true => |
| getEntity(WhiskAction.resolveActionAndMergeParameters(entityStore, entityName), Some { action: WhiskAction => |
| val mergedAction = env map { |
| action inherit _ |
| } getOrElse action |
| complete(OK, mergedAction) |
| }) |
| case false => |
| getEntity(WhiskActionMetaData.resolveActionAndMergeParameters(entityStore, entityName), Some { |
| action: WhiskActionMetaData => |
| val mergedAction = env map { |
| action inherit _ |
| } getOrElse action |
| complete(OK, mergedAction) |
| }) |
| } |
| } |
| } |
| |
| /** |
| * Gets all actions in a path. |
| * |
| * Responses are one of (Code, Message) |
| * - 200 [] or [WhiskAction as JSON] |
| * - 500 Internal Server Error |
| */ |
| override def list(user: Identity, namespace: EntityPath)(implicit transid: TransactionId) = { |
| parameter( |
| 'skip.as[ListSkip] ? ListSkip(collection.defaultListSkip), |
| 'limit.as[ListLimit] ? ListLimit(collection.defaultListLimit), |
| 'count ? false) { (skip, limit, count) => |
| if (!count) { |
| listEntities { |
| WhiskAction.listCollectionInNamespace(entityStore, namespace, skip.n, limit.n, includeDocs = false) map { |
| list => |
| list.fold((js) => js, (as) => as.map(WhiskAction.serdes.write(_))) |
| } |
| } |
| } else { |
| countEntities { |
| WhiskAction.countCollectionInNamespace(entityStore, namespace, skip.n) |
| } |
| } |
| } |
| } |
| |
| /** Replaces default namespaces in a vector of components from a sequence with appropriate namespace. */ |
| private def resolveDefaultNamespace(components: Vector[FullyQualifiedEntityName], |
| user: Identity): Vector[FullyQualifiedEntityName] = { |
| // if components are part of the default namespace, they contain `_`; replace it! |
| val resolvedComponents = components map { c => |
| FullyQualifiedEntityName(c.path.resolveNamespace(user.namespace), c.name) |
| } |
| resolvedComponents |
| } |
| |
| /** Replaces default namespaces in an action sequence with appropriate namespace. */ |
| private def resolveDefaultNamespace(seq: SequenceExec, user: Identity): SequenceExec = { |
| // if components are part of the default namespace, they contain `_`; replace it! |
| val resolvedComponents = resolveDefaultNamespace(seq.components, user) |
| new SequenceExec(resolvedComponents) |
| } |
| |
| /** |
| * Creates a WhiskAction instance from the PUT request. |
| */ |
| private def makeWhiskAction(content: WhiskActionPut, entityName: FullyQualifiedEntityName)( |
| implicit transid: TransactionId) = { |
| val exec = content.exec.get |
| val limits = content.limits map { l => |
| ActionLimits( |
| l.timeout getOrElse TimeLimit(), |
| l.memory getOrElse MemoryLimit(), |
| l.logs getOrElse LogLimit(), |
| l.concurrency getOrElse ConcurrencyLimit()) |
| } getOrElse ActionLimits() |
| // This is temporary while we are making sequencing directly supported in the controller. |
| // The parameter override allows this to work with Pipecode.code. Any parameters other |
| // than the action sequence itself are discarded and have no effect. |
| // Note: While changing the implementation of sequences, components now store the fully qualified entity names |
| // (which loses the leading "/"). Adding it back while both versions of the code are in place. |
| val parameters = exec match { |
| case seq: SequenceExec => |
| Parameters("_actions", JsArray(seq.components map { _.qualifiedNameWithLeadingSlash.toJson })) |
| case _ => content.parameters getOrElse Parameters() |
| } |
| |
| WhiskAction( |
| entityName.path, |
| entityName.name, |
| exec, |
| parameters, |
| limits, |
| content.version getOrElse SemVer(), |
| content.publish getOrElse false, |
| WhiskActionsApi.amendAnnotations(content.annotations getOrElse Parameters(), exec)) |
| } |
| |
| /** For a sequence action, gather referenced entities and authorize access. */ |
| private def entitleReferencedEntities(user: Identity, right: Privilege, exec: Option[Exec])( |
| implicit transid: TransactionId) = { |
| exec match { |
| case Some(seq: SequenceExec) => |
| logging.debug(this, "checking if sequence components are accessible") |
| entitlementProvider.check(user, right, referencedEntities(seq), noThrottle = true) |
| case _ => Future.successful(true) |
| } |
| } |
| |
| private def entitleReferencedEntitiesMetaData(user: Identity, right: Privilege, exec: Option[ExecMetaDataBase])( |
| implicit transid: TransactionId) = { |
| exec match { |
| case Some(seq: SequenceExecMetaData) => |
| logging.info(this, "checking if sequence components are accessible") |
| entitlementProvider.check(user, right, referencedEntities(seq), noThrottle = true) |
| case _ => Future.successful(true) |
| } |
| } |
| |
| /** Creates a WhiskAction from PUT content, generating default values where necessary. */ |
| private def make(user: Identity, entityName: FullyQualifiedEntityName, content: WhiskActionPut)( |
| implicit transid: TransactionId) = { |
| content.exec map { |
| case seq: SequenceExec => |
| // check that the sequence conforms to max length and no recursion rules |
| checkSequenceActionLimits(entityName, seq.components) map { _ => |
| makeWhiskAction(content.replace(seq), entityName) |
| } |
| case supportedExec if !supportedExec.deprecated => |
| Future successful makeWhiskAction(content, entityName) |
| case deprecatedExec => |
| Future failed RejectRequest(BadRequest, runtimeDeprecated(deprecatedExec)) |
| |
| } getOrElse Future.failed(RejectRequest(BadRequest, "exec undefined")) |
| } |
| |
| /** Updates a WhiskAction from PUT content, merging old action where necessary. */ |
| private def update(user: Identity, content: WhiskActionPut)(action: WhiskAction)(implicit transid: TransactionId) = { |
| content.exec map { |
| case seq: SequenceExec => |
| // check that the sequence conforms to max length and no recursion rules |
| checkSequenceActionLimits(FullyQualifiedEntityName(action.namespace, action.name), seq.components) map { _ => |
| updateWhiskAction(content.replace(seq), action) |
| } |
| case supportedExec if !supportedExec.deprecated => |
| Future successful updateWhiskAction(content, action) |
| case deprecatedExec => |
| Future failed RejectRequest(BadRequest, runtimeDeprecated(deprecatedExec)) |
| } getOrElse { |
| if (!action.exec.deprecated) { |
| Future successful updateWhiskAction(content, action) |
| } else { |
| Future failed RejectRequest(BadRequest, runtimeDeprecated(action.exec)) |
| } |
| } |
| } |
| |
| /** |
| * Updates a WhiskAction instance from the PUT request. |
| */ |
| private def updateWhiskAction(content: WhiskActionPut, action: WhiskAction)(implicit transid: TransactionId) = { |
| val limits = content.limits map { l => |
| ActionLimits( |
| l.timeout getOrElse action.limits.timeout, |
| l.memory getOrElse action.limits.memory, |
| l.logs getOrElse action.limits.logs, |
| l.concurrency getOrElse action.limits.concurrency) |
| } getOrElse action.limits |
| |
| // This is temporary while we are making sequencing directly supported in the controller. |
| // Actions that are updated with a sequence will have their parameter property overridden. |
| // Actions that are updated with non-sequence actions will either set the parameter property according to |
| // the content provided, or if that is not defined, and iff the previous version of the action was not a |
| // sequence, inherit previous parameters. This is because sequence parameters are special and should not |
| // leak to non-sequence actions. |
| // If updating an action but not specifying a new exec type, then preserve the previous parameters if the |
| // existing type of the action is a sequence (regardless of what parameters may be defined in the content) |
| // otherwise, parameters are inferred from the content or previous values. |
| // Note: While changing the implementation of sequences, components now store the fully qualified entity names |
| // (which loses the leading "/"). Adding it back while both versions of the code are in place. This will disappear completely |
| // once the version of sequences with "pipe.js" is removed. |
| val parameters = content.exec map { |
| case seq: SequenceExec => |
| Parameters("_actions", JsArray(seq.components map { c => |
| JsString("/" + c.toString) |
| })) |
| case _ => |
| content.parameters getOrElse { |
| action.exec match { |
| case seq: SequenceExec => Parameters() |
| case _ => action.parameters |
| } |
| } |
| } getOrElse { |
| action.exec match { |
| case seq: SequenceExec => action.parameters // discard content.parameters |
| case _ => content.parameters getOrElse action.parameters |
| } |
| } |
| |
| val exec = content.exec getOrElse action.exec |
| |
| WhiskAction( |
| action.namespace, |
| action.name, |
| exec, |
| parameters, |
| limits, |
| content.version getOrElse action.version.upPatch, |
| content.publish getOrElse action.publish, |
| WhiskActionsApi.amendAnnotations(content.annotations getOrElse action.annotations, exec, create = false)) |
| .revision[WhiskAction](action.docinfo.rev) |
| } |
| |
| /** |
| * Lists actions in package or binding. The router authorized the subject for the package |
| * (if binding, then authorized subject for both the binding and the references package) |
| * and iff authorized, this method is reached to lists actions. |
| * |
| * Note that when listing actions in a binding, the namespace on the actions will be that |
| * of the referenced packaged, not the binding. |
| */ |
| private def listPackageActions(user: Identity, pkgName: FullyQualifiedEntityName)(implicit transid: TransactionId) = { |
| // get the package to determine if it is a package or reference |
| // (this will set the appropriate namespace), and then list actions |
| // NOTE: these fetches are redundant with those from the authorization |
| // and should hit the cache to ameliorate the cost; this can be improved |
| // but requires communicating back from the authorization service the |
| // resolved namespace |
| getEntity(WhiskPackage.get(entityStore, pkgName.toDocId), Some { (wp: WhiskPackage) => |
| val pkgns = wp.binding map { b => |
| logging.debug(this, s"list actions in package binding '${wp.name}' -> '$b'") |
| b.namespace.addPath(b.name) |
| } getOrElse { |
| logging.debug(this, s"list actions in package '${wp.name}'") |
| pkgName.path.addPath(wp.name) |
| } |
| // list actions in resolved namespace |
| list(user, pkgns) |
| }) |
| } |
| |
| /** |
| * Checks that the sequence is not cyclic and that the number of atomic actions in the "inlined" sequence is lower than max allowed. |
| * |
| * @param sequenceAction is the action sequence to check |
| * @param components the components of the sequence |
| */ |
| private def checkSequenceActionLimits( |
| sequenceAction: FullyQualifiedEntityName, |
| components: Vector[FullyQualifiedEntityName])(implicit transid: TransactionId): Future[Unit] = { |
| // first checks that current sequence length is allowed |
| // then traverses all actions in the sequence, inlining any that are sequences |
| val future = if (components.size > actionSequenceLimit) { |
| Future.failed(TooManyActionsInSequence()) |
| } else if (components.size == 0) { |
| Future.failed(NoComponentInSequence()) |
| } else { |
| // resolve the action document id (if it's in a package/binding); |
| // this assumes that entityStore is the same for actions and packages |
| WhiskAction.resolveAction(entityStore, sequenceAction) flatMap { resolvedSeq => |
| val atomicActionCnt = countAtomicActionsAndCheckCycle(resolvedSeq, components) |
| atomicActionCnt map { count => |
| logging.debug(this, s"sequence '$sequenceAction' atomic action count $count") |
| if (count > actionSequenceLimit) { |
| throw TooManyActionsInSequence() |
| } |
| } |
| } |
| } |
| |
| future recoverWith { |
| case _: TooManyActionsInSequence => Future failed RejectRequest(BadRequest, sequenceIsTooLong) |
| case _: NoComponentInSequence => Future failed RejectRequest(BadRequest, sequenceNoComponent) |
| case _: SequenceWithCycle => Future failed RejectRequest(BadRequest, sequenceIsCyclic) |
| case _: NoDocumentException => Future failed RejectRequest(BadRequest, sequenceComponentNotFound) |
| } |
| } |
| |
| /** |
| * Counts the number of atomic actions in a sequence and checks for potential cycles. The latter is done |
| * by inlining any sequence components that are themselves sequences and checking if there if a reference to |
| * the given original sequence. |
| * |
| * @param origSequence the original sequence that is updated/created which generated the checks |
| * @param components the components of the a sequence to check if they reference the original sequence |
| * @return Future with the number of atomic actions in the current sequence or an appropriate error if there is a cycle or a non-existent action reference |
| */ |
| private def countAtomicActionsAndCheckCycle( |
| origSequence: FullyQualifiedEntityName, |
| components: Vector[FullyQualifiedEntityName])(implicit transid: TransactionId): Future[Int] = { |
| if (components.size > actionSequenceLimit) { |
| Future.failed(TooManyActionsInSequence()) |
| } else { |
| // resolve components wrt any package bindings |
| val resolvedComponentsFutures = components map { c => |
| WhiskAction.resolveAction(entityStore, c) |
| } |
| // traverse the sequence structure by checking each of its components and do the following: |
| // 1. check whether any action (sequence or not) referred by the sequence (directly or indirectly) |
| // is the same as the original sequence (aka origSequence) |
| // 2. count the atomic actions each component has (by "inlining" all sequences) |
| val actionCountsFutures = resolvedComponentsFutures map { |
| _ flatMap { resolvedComponent => |
| // check whether this component is the same as origSequence |
| // this can happen when updating an atomic action to become a sequence |
| if (origSequence == resolvedComponent) { |
| Future failed SequenceWithCycle() |
| } else { |
| // check whether component is a sequence or an atomic action |
| // if the component does not exist, the future will fail with appropriate error |
| WhiskAction.get(entityStore, resolvedComponent.toDocId) flatMap { wskComponent => |
| wskComponent.exec match { |
| case SequenceExec(seqComponents) => |
| // sequence action, count the number of atomic actions in this sequence |
| countAtomicActionsAndCheckCycle(origSequence, seqComponents) |
| case _ => Future successful 1 // atomic action count is one |
| } |
| } |
| } |
| } |
| } |
| // collapse the futures in one future |
| val actionCountsFuture = Future.sequence(actionCountsFutures) |
| // sum up all individual action counts per component |
| val totalActionCount = actionCountsFuture map { actionCounts => |
| actionCounts.foldLeft(0)(_ + _) |
| } |
| totalActionCount |
| } |
| } |
| |
| /** Max atomic action count allowed for sequences. */ |
| private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt |
| |
| implicit val stringToFiniteDuration: Unmarshaller[String, FiniteDuration] = { |
| Unmarshaller.strict[String, FiniteDuration] { value => |
| val max = controllerActivationConfig.maxWaitForBlockingActivation.toMillis |
| |
| Try { value.toInt } match { |
| case Success(i) if i > 0 && i <= max => i.milliseconds |
| case _ => |
| throw new IllegalArgumentException( |
| Messages.invalidTimeout(controllerActivationConfig.maxWaitForBlockingActivation)) |
| } |
| } |
| } |
| |
| /** Custom unmarshaller for query parameters "limit" for "list" operations. */ |
| private implicit val stringToListLimit: Unmarshaller[String, ListLimit] = RestApiCommons.stringToListLimit(collection) |
| |
| /** Custom unmarshaller for query parameters "skip" for "list" operations. */ |
| private implicit val stringToListSkip: Unmarshaller[String, ListSkip] = RestApiCommons.stringToListSkip(collection) |
| |
| } |
| |
| private case class TooManyActionsInSequence() extends IllegalArgumentException |
| private case class NoComponentInSequence() extends IllegalArgumentException |
| private case class SequenceWithCycle() extends IllegalArgumentException |