Check to see if the user can see the topic before creating it, (#4804)
this allows lower privilege users to be set for the controller
and invoker.
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
index b54d684..addf680 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
@@ -68,19 +68,25 @@
val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava)
def createTopic(retries: Int = 5): Try[Unit] = {
- Try(client.createTopics(List(nt).asJava).values().get(topic).get())
- .map(_ => logging.info(this, s"created topic $topic"))
- .recoverWith {
- case CausedBy(_: TopicExistsException) =>
- Success(logging.info(this, s"topic $topic already existed"))
- case CausedBy(t: RetriableException) if retries > 0 =>
- logging.warn(this, s"topic $topic could not be created because of $t, retries left: $retries")
- Thread.sleep(1.second.toMillis)
- createTopic(retries - 1)
- case t =>
- logging.error(this, s"ensureTopic for $topic failed due to $t")
- Failure(t)
- }
+ Try(client.listTopics().names().get())
+ .map(topics =>
+ if (topics.contains(topic)) {
+ Success(logging.info(this, s"$topic already exists and the user can see it, skipping creation."))
+ } else {
+ Try(client.createTopics(List(nt).asJava).values().get(topic).get())
+ .map(_ => logging.info(this, s"created topic $topic"))
+ .recoverWith {
+ case CausedBy(_: TopicExistsException) =>
+ Success(logging.info(this, s"topic $topic already existed"))
+ case CausedBy(t: RetriableException) if retries > 0 =>
+ logging.warn(this, s"topic $topic could not be created because of $t, retries left: $retries")
+ Thread.sleep(1.second.toMillis)
+ createTopic(retries - 1)
+ case t =>
+ logging.error(this, s"ensureTopic for $topic failed due to $t")
+ Failure(t)
+ }
+ })
}
val result = createTopic()