[New Scheduler]Implement FPCEntitlementProvider (#5029)
* Implement FPCEntitlementProvider
* Add throttling metric
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala
index 4b33a4b..8092ad8 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala
@@ -153,7 +153,7 @@
activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, _.limits.concurrentInvocations))
private val messagingProvider = SpiLoader.get[MessagingProvider]
- private val eventProducer = messagingProvider.getProducer(this.config)
+ protected val eventProducer = messagingProvider.getProducer(this.config)
/**
* Grants a subject the right to access a resources.
@@ -201,6 +201,19 @@
.flatMap(_ => checkThrottleOverload(concurrentInvokeThrottler.check(user), user))
}
+ /**
+ * Checks action activation rate throttles for an identity.
+ *
+ * @param user the identity to check rate throttles for
+ * @param right the privilege the subject is requesting
+ * @param resources the set of resource the subject requests access to
+ * @return a promise that completes with success iff the user is within their activation quota
+ */
+ protected[core] def checkThrottles(user: Identity, right: Privilege, resources: Set[Resource])(
+ implicit transid: TransactionId): Future[Unit] = {
+ checkUserThrottle(user, right, resources).flatMap(_ => checkConcurrentUserThrottle(user, right, resources))
+ }
+
private val kindRestrictor = {
import pureconfig._
import pureconfig.generic.auto._
@@ -284,11 +297,7 @@
val entitlementCheck: Future[Unit] = if (user.rights.contains(right)) {
if (resources.nonEmpty) {
logging.debug(this, s"checking user '$subject' has privilege '$right' for '${resources.mkString(", ")}'")
- val throttleCheck =
- if (noThrottle) Future.successful(())
- else
- checkUserThrottle(user, right, resources)
- .flatMap(_ => checkConcurrentUserThrottle(user, right, resources))
+ val throttleCheck = if (noThrottle) Future.successful(()) else checkThrottles(user, right, resources)
throttleCheck
.flatMap(_ => checkPrivilege(user, right, resources))
.flatMap(checkedResources => {
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/FPCEntitlement.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/FPCEntitlement.scala
new file mode 100644
index 0000000..9e89db2
--- /dev/null
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/FPCEntitlement.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.entitlement
+
+import scala.concurrent.Future
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes.TooManyRequests
+import org.apache.openwhisk.common.{Logging, TransactionId, UserEvents}
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.connector.{EventMessage, Metric}
+import org.apache.openwhisk.core.controller.RejectRequest
+import org.apache.openwhisk.core.entitlement.Privilege.ACTIVATE
+import org.apache.openwhisk.core.entity.{ControllerInstanceId, Identity}
+import org.apache.openwhisk.core.loadBalancer.LoadBalancer
+
+protected[core] class FPCEntitlementProvider(
+ private val config: WhiskConfig,
+ private val loadBalancer: LoadBalancer,
+ private val controllerInstance: ControllerInstanceId)(implicit actorSystem: ActorSystem, logging: Logging)
+ extends LocalEntitlementProvider(config, loadBalancer, controllerInstance) {
+
+ override protected[core] def checkThrottles(user: Identity, right: Privilege, resources: Set[Resource])(
+ implicit transid: TransactionId): Future[Unit] = {
+ if (right == ACTIVATE) {
+ val checks = resources.filter(_.collection.path == Collection.ACTIONS).map { res =>
+ loadBalancer.checkThrottle(user.namespace.name.toPath, res.fqname)
+ }
+ if (checks.contains(true)) {
+ val metric = Metric("ConcurrentRateLimit", 1)
+ UserEvents.send(
+ eventProducer,
+ EventMessage(
+ s"controller${controllerInstance.asString}",
+ metric,
+ user.subject,
+ user.namespace.name.toString,
+ user.namespace.uuid,
+ metric.typeName))
+ Future.failed(RejectRequest(TooManyRequests, "Too many requests"))
+ } else Future.successful(())
+ } else Future.successful(())
+ }
+
+}
+
+private object FPCEntitlementProvider extends EntitlementSpiProvider {
+
+ override def instance(config: WhiskConfig, loadBalancer: LoadBalancer, instance: ControllerInstanceId)(
+ implicit actorSystem: ActorSystem,
+ logging: Logging) =
+ new FPCEntitlementProvider(config: WhiskConfig, loadBalancer: LoadBalancer, instance)
+}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
index 041ce0c..2022512 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
@@ -75,6 +75,9 @@
/** Gets the size of the cluster all loadbalancers are acting in */
def clusterSize: Int = 1
+
+ /** Gets the throttling for given action. */
+ def checkThrottle(namespace: EntityPath, action: String): Boolean = false
}
/**