Add prefix for topics (#5062)

- Add prefix for topics
- Add extra prefix for userEvent topic only
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 6e8a861..6851c70 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -139,6 +139,8 @@
   confdir: "{{ config_root_dir }}/registry"
 
 kafka:
+  topicsPrefix: "{{ kafka_topics_prefix | default('') }}"
+  topicsUserEventPrefix: "{{ kafka_topics_userEvent_prefix | default(kafka_topics_prefix) | default('') }}"
   ssl:
     client_authentication: required
     keystore:
diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml
index 64724c4..575aaf4 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -176,6 +176,10 @@
         "{{ kafka_topics_health_retentionMS | default() }}"
       "CONFIG_whisk_kafka_topics_health_segmentBytes":
         "{{ kafka_topics_health_segmentBytes | default() }}"
+      "CONFIG_whisk_kafka_topics_prefix":
+        "{{ kafka.topicsPrefix }}"
+      "CONFIG_whisk_kafka_topics_userEvent_prefix":
+        "{{ kafka.topicsUserEventPrefix }}"
       "CONFIG_whisk_kafka_common_securityProtocol":
         "{{ kafka.protocol }}"
       "CONFIG_whisk_kafka_common_sslTruststoreLocation":
diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml
index 8c3c027..fe79439 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -216,6 +216,8 @@
       "CONFIG_whisk_kafka_topics_invoker_retentionBytes": "{{ kafka_topics_invoker_retentionBytes | default() }}"
       "CONFIG_whisk_kafka_topics_invoker_retentionMs": "{{ kafka_topics_invoker_retentionMS | default() }}"
       "CONFIG_whisk_kakfa_topics_invoker_segmentBytes": "{{ kafka_topics_invoker_segmentBytes | default() }}"
+      "CONFIG_whisk_kafka_topics_prefix": "{{ kafka.topicsPrefix }}"
+      "CONFIG_whisk_kafka_topics_userEvent_prefix": "{{ kafka.topicsUserEventPrefix }}"
       "CONFIG_whisk_kafka_common_securityProtocol": "{{ kafka.protocol }}"
       "CONFIG_whisk_kafka_common_sslTruststoreLocation": "/conf/{{ kafka.ssl.keystore.name }}"
       "CONFIG_whisk_kafka_common_sslTruststorePassword": "{{ kafka.ssl.keystore.password }}"
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index a894360..230e16d 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -193,6 +193,10 @@
                 retention-bytes = 1073741824
                 retention-ms    = 3600000
             }
+            prefix = ""
+            user-event {
+                prefix = ""
+            }
         }
 
         metrics {
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/common/UserEvents.scala b/common/scala/src/main/scala/org/apache/openwhisk/common/UserEvents.scala
index 5417cfd..f3cdf00 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/UserEvents.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/UserEvents.scala
@@ -28,9 +28,11 @@
 
   val enabled = loadConfigOrThrow[UserEventsConfig](ConfigKeys.userEvents).enabled
 
+  val userEventTopicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsUserEventPrefix)
+
   def send(producer: MessageProducer, em: => EventMessage) = {
     if (enabled) {
-      producer.send("events", em)
+      producer.send(userEventTopicPrefix + "events", em)
     }
   }
 }
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 19ad39d..1058e53 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -212,6 +212,8 @@
   val kafkaProducer = s"$kafka.producer"
   val kafkaConsumer = s"$kafka.consumer"
   val kafkaTopics = s"$kafka.topics"
+  val kafkaTopicsPrefix = s"$kafkaTopics.prefix"
+  val kafkaTopicsUserEventPrefix = s"$kafkaTopics.user-event.prefix"
 
   val memory = "whisk.memory"
   val timeLimit = "whisk.time-limit"
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala
index eb9cce9..b798d88 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/ack/MessagingActiveAck.scala
@@ -19,9 +19,10 @@
 
 import org.apache.kafka.common.errors.RecordTooLargeException
 import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.ConfigKeys
 import org.apache.openwhisk.core.connector.{AcknowledegmentMessage, EventMessage, MessageProducer}
 import org.apache.openwhisk.core.entity._
-
+import pureconfig._
 import scala.concurrent.{ExecutionContext, Future}
 import scala.util.{Failure, Success}
 
@@ -29,6 +30,9 @@
   implicit logging: Logging,
   ec: ExecutionContext)
     extends ActiveAck {
+
+  private val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
+
   override def apply(tid: TransactionId,
                      activationResult: WhiskActivation,
                      blockingInvoke: Boolean,
@@ -38,7 +42,7 @@
     implicit val transid: TransactionId = tid
 
     def send(msg: AcknowledegmentMessage, recovery: Boolean = false) = {
-      producer.send(topic = "completed" + controllerInstance.asString, msg).andThen {
+      producer.send(topic = topicPrefix + "completed" + controllerInstance.asString, msg).andThen {
         case Success(_) =>
           val info = if (recovery) s"recovery ${msg.messageType}" else msg.messageType
           logging.info(this, s"posted $info of activation ${acknowledegment.activationId}")
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala
index c8c432b..c5b5e01 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/RemoteCacheInvalidation.scala
@@ -24,12 +24,11 @@
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
-
 import akka.actor.ActorSystem
 import akka.actor.Props
 import spray.json._
 import org.apache.openwhisk.common.Logging
-import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.core.connector.Message
 import org.apache.openwhisk.core.connector.MessageFeed
 import org.apache.openwhisk.core.connector.MessagingProvider
@@ -41,6 +40,7 @@
 import org.apache.openwhisk.core.entity.WhiskRule
 import org.apache.openwhisk.core.entity.WhiskTrigger
 import org.apache.openwhisk.spi.SpiLoader
+import pureconfig._
 
 case class CacheInvalidationMessage(key: CacheKey, instanceId: String) extends Message {
   override def serialize = CacheInvalidationMessage.serdes.write(this).compactPrint
@@ -101,5 +101,6 @@
 }
 
 object RemoteCacheInvalidation {
-  val cacheInvalidationTopic = "cacheInvalidation"
+  val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
+  val cacheInvalidationTopic = topicPrefix + "cacheInvalidation"
 }
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
index 9352196..a8ead9d 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala
@@ -32,7 +32,7 @@
 import spray.json._
 import org.apache.openwhisk.common.Https.HttpsConfig
 import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
-import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.core.connector.MessagingProvider
 import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
 import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
@@ -187,6 +187,9 @@
   protected val interface = loadConfigOrThrow[String]("whisk.controller.interface")
   protected val readinessThreshold = loadConfig[Double]("whisk.controller.readiness-fraction")
 
+  val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
+  val userEventTopicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsUserEventPrefix)
+
   // requiredProperties is a Map whose keys define properties that must be bound to
   // a value, and whose values are default values.   A null value in the Map means there is
   // no default value specified, so it must appear in the properties file
@@ -263,10 +266,10 @@
     val msgProvider = SpiLoader.get[MessagingProvider]
 
     Seq(
-      ("completed" + instance.asString, "completed", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
-      ("health", "health", None),
-      ("cacheInvalidation", "cache-invalidation", None),
-      ("events", "events", None)).foreach {
+      (topicPrefix + "completed" + instance.asString, "completed", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
+      (topicPrefix + "health", "health", None),
+      (topicPrefix + "cacheInvalidation", "cache-invalidation", None),
+      (userEventTopicPrefix + "events", "events", None)).foreach {
       case (topic, topicConfigurationKey, maxMessageBytes) =>
         if (msgProvider.ensureTopic(config, topic, topicConfigurationKey, maxMessageBytes).isFailure) {
           abort(s"failure during msgProvider.ensureTopic for topic $topic")
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
index 3d0cc2f..7820207 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
@@ -30,6 +30,7 @@
 import org.apache.openwhisk.common.LoggingMarkers._
 import org.apache.openwhisk.common._
 import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.controller.Controller
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
@@ -177,7 +178,7 @@
                                         invoker: InvokerInstanceId): Future[RecordMetadata] = {
     implicit val transid: TransactionId = msg.transid
 
-    val topic = s"invoker${invoker.toInt}"
+    val topic = s"${Controller.topicPrefix}invoker${invoker.toInt}"
 
     MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START)
     val start = transid.started(
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
index 2022512..ee839b3 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala
@@ -23,8 +23,10 @@
 import org.apache.openwhisk.common.{Logging, TransactionId}
 import org.apache.openwhisk.core.WhiskConfig
 import org.apache.openwhisk.core.connector._
+import org.apache.openwhisk.core.controller.Controller
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.spi.Spi
+
 import scala.concurrent.duration._
 
 /**
@@ -94,7 +96,7 @@
   def createFeedFactory(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(implicit actorSystem: ActorSystem,
                                                                                   logging: Logging): FeedFactory = {
 
-    val activeAckTopic = s"completed${instance.asString}"
+    val activeAckTopic = s"${Controller.topicPrefix}completed${instance.asString}"
     val maxActiveAcksPerPoll = 128
     val activeAckPollDuration = 1.second
 
diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 28c1a83..14d3ff4 100644
--- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -36,6 +36,7 @@
 import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size.SizeLong
 import org.apache.openwhisk.common.LoggingMarkers._
+import org.apache.openwhisk.core.controller.Controller
 import org.apache.openwhisk.core.loadBalancer.InvokerState.{Healthy, Offline, Unhealthy, Unresponsive}
 import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
 import org.apache.openwhisk.spi.SpiLoader
@@ -352,7 +353,11 @@
           InvokerPool.props(
             (f, i) => f.actorOf(InvokerActor.props(i, instance)),
             (m, i) => sendActivationToInvoker(messagingProducer, m, i),
-            messagingProvider.getConsumer(whiskConfig, s"health${instance.asString}", "health", maxPeek = 128),
+            messagingProvider.getConsumer(
+              whiskConfig,
+              s"${Controller.topicPrefix}health${instance.asString}",
+              s"${Controller.topicPrefix}health",
+              maxPeek = 128),
             monitor))
       }
 
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
index 3d9cc46..65083b1 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala
@@ -73,6 +73,8 @@
 
   protected val protocol = loadConfigOrThrow[String]("whisk.invoker.protocol")
 
+  val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
+
   /**
    * An object which records the environment variables required for this component to run.
    */
@@ -175,7 +177,7 @@
     initKamon(assignedInvokerId)
 
     val topicBaseName = "invoker"
-    val topicName = topicBaseName + assignedInvokerId
+    val topicName = topicPrefix + topicBaseName + assignedInvokerId
 
     val maxMessageBytes = Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)
     val invokerInstance =
diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
index db0dfb4..7ddff8d 100644
--- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
@@ -120,7 +120,7 @@
   }
 
   /** Initialize message consumers */
-  private val topic = s"invoker${instance.toInt}"
+  private val topic = s"${Invoker.topicPrefix}invoker${instance.toInt}"
   private val maximumContainers = (poolConfig.userMemory / MemoryLimit.MIN_MEMORY).toInt
   private val msgProvider = SpiLoader.get[MessagingProvider]
 
@@ -296,7 +296,7 @@
 
   private val healthProducer = msgProvider.getProducer(config)
   Scheduler.scheduleWaitAtMost(1.seconds)(() => {
-    healthProducer.send("health", PingMessage(instance)).andThen {
+    healthProducer.send(s"${Invoker.topicPrefix}health", PingMessage(instance)).andThen {
       case Failure(t) => logging.error(this, s"failed to ping the controller: $t")
     }
   })
diff --git a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
index d9bb08d..61f9927 100644
--- a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
+++ b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala
@@ -138,12 +138,9 @@
    */
   val memoryQueueFactory = "" // TODO: TBD
 
-  val schedulerConsumer = msgProvider.getConsumer(
-    config,
-    s"scheduler${schedulerId.asString}",
-    s"scheduler${schedulerId.asString}",
-    maxPeek,
-    maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
+  val topic = s"${Scheduler.topicPrefix}scheduler${schedulerId.asString}"
+  val schedulerConsumer =
+    msgProvider.getConsumer(config, topic, topic, maxPeek, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
 
   implicit val trasnid = TransactionId.containerCreation
 
@@ -171,6 +168,8 @@
 
   protected val protocol = loadConfigOrThrow[String]("whisk.scheduler.protocol")
 
+  val topicPrefix = loadConfigOrThrow[String](ConfigKeys.kafkaTopicsPrefix)
+
   /**
    * The scheduler has two ports, one for akka-remote and the other for akka-grpc.
    */
@@ -236,8 +235,11 @@
     val msgProvider = SpiLoader.get[MessagingProvider]
 
     Seq(
-      ("scheduler" + instanceId.asString, "actions", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
-      ("creationAck" + instanceId.asString, "creationAck", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)))
+      (topicPrefix + "scheduler" + instanceId.asString, "actions", Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)),
+      (
+        topicPrefix + "creationAck" + instanceId.asString,
+        "creationAck",
+        Some(ActivationEntityLimit.MAX_ACTIVATION_LIMIT)))
       .foreach {
         case (topic, topicConfigurationKey, maxMessageBytes) =>
           if (msgProvider.ensureTopic(config, topic, topicConfigurationKey, maxMessageBytes).isFailure) {
diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2
index cdae2bd..5ce7f31 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -29,6 +29,10 @@
                 retention-bytes = 1073741824
                 retention-ms    = 3600000
             }
+            prefix = "{{ kafka.topicsPrefix }}"
+            user-event {
+                prefix = "{{ kafka.topicsUserEventPrefix }}"
+            }
         }
         common {
           security-protocol: {{ kafka.protocol }}