[New Scheduler]Implement FPCEntitlementProvider (#5029)

* Implement FPCEntitlementProvider

* Add throttling metric
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala
index 4b33a4b..8092ad8 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/Entitlement.scala
@@ -153,7 +153,7 @@
       activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, _.limits.concurrentInvocations))
 
   private val messagingProvider = SpiLoader.get[MessagingProvider]
-  private val eventProducer = messagingProvider.getProducer(this.config)
+  protected val eventProducer = messagingProvider.getProducer(this.config)
 
   /**
    * Grants a subject the right to access a resources.
@@ -201,6 +201,19 @@
       .flatMap(_ => checkThrottleOverload(concurrentInvokeThrottler.check(user), user))
   }
 
+  /**
+   * Checks action activation rate throttles for an identity.
+   *
+   * @param user      the identity to check rate throttles for
+   * @param right     the privilege the subject is requesting
+   * @param resources the set of resource the subject requests access to
+   * @return a promise that completes with success iff the user is within their activation quota
+   */
+  protected[core] def checkThrottles(user: Identity, right: Privilege, resources: Set[Resource])(
+    implicit transid: TransactionId): Future[Unit] = {
+    checkUserThrottle(user, right, resources).flatMap(_ => checkConcurrentUserThrottle(user, right, resources))
+  }
+
   private val kindRestrictor = {
     import pureconfig._
     import pureconfig.generic.auto._
@@ -284,11 +297,7 @@
     val entitlementCheck: Future[Unit] = if (user.rights.contains(right)) {
       if (resources.nonEmpty) {
         logging.debug(this, s"checking user '$subject' has privilege '$right' for '${resources.mkString(", ")}'")
-        val throttleCheck =
-          if (noThrottle) Future.successful(())
-          else
-            checkUserThrottle(user, right, resources)
-              .flatMap(_ => checkConcurrentUserThrottle(user, right, resources))
+        val throttleCheck = if (noThrottle) Future.successful(()) else checkThrottles(user, right, resources)
         throttleCheck
           .flatMap(_ => checkPrivilege(user, right, resources))
           .flatMap(checkedResources => {
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/FPCEntitlement.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/FPCEntitlement.scala
new file mode 100644
index 0000000..9e89db2
--- /dev/null
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/entitlement/FPCEntitlement.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.entitlement
+
+import scala.concurrent.Future
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.StatusCodes.TooManyRequests
+import org.apache.openwhisk.common.{Logging, TransactionId, UserEvents}
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.connector.{EventMessage, Metric}
+import org.apache.openwhisk.core.controller.RejectRequest
+import org.apache.openwhisk.core.entitlement.Privilege.ACTIVATE
+import org.apache.openwhisk.core.entity.{ControllerInstanceId, Identity}
+import org.apache.openwhisk.core.loadBalancer.LoadBalancer
+
+protected[core] class FPCEntitlementProvider(
+  private val config: WhiskConfig,
+  private val loadBalancer: LoadBalancer,
+  private val controllerInstance: ControllerInstanceId)(implicit actorSystem: ActorSystem, logging: Logging)
+    extends LocalEntitlementProvider(config, loadBalancer, controllerInstance) {
+
+  override protected[core] def checkThrottles(user: Identity, right: Privilege, resources: Set[Resource])(
+    implicit transid: TransactionId): Future[Unit] = {
+    if (right == ACTIVATE) {
+      val checks = resources.filter(_.collection.path == Collection.ACTIONS).map { res =>
+        loadBalancer.checkThrottle(user.namespace.name.toPath, res.fqname)
+      }
+      if (checks.contains(true)) {
+        val metric = Metric("ConcurrentRateLimit", 1)
+        UserEvents.send(
+          eventProducer,
+          EventMessage(
+            s"controller${controllerInstance.asString}",
+            metric,
+            user.subject,
+            user.namespace.name.toString,
+            user.namespace.uuid,
+            metric.typeName))
+        Future.failed(RejectRequest(TooManyRequests, "Too many requests"))
+      } else Future.successful(())
+    } else Future.successful(())
+  }
+
+}
+
+private object FPCEntitlementProvider extends EntitlementSpiProvider {
+
+  override def instance(config: WhiskConfig, loadBalancer: LoadBalancer, instance: ControllerInstanceId)(
+    implicit actorSystem: ActorSystem,
+    logging: Logging) =
+    new FPCEntitlementProvider(config: WhiskConfig, loadBalancer: LoadBalancer, instance)
+}
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
index 041ce0c..2022512 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
@@ -75,6 +75,9 @@
 
   /** Gets the size of the cluster all loadbalancers are acting in */
   def clusterSize: Int = 1
+
+  /** Gets the throttling for given action. */
+  def checkThrottle(namespace: EntityPath, action: String): Boolean = false
 }
 
 /**