Check to see if the user can see the topic before creating it, (#4804)

this allows lower privilege users to be set for the controller
and invoker.
diff --git a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
index b54d684..addf680 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/connector/kafka/KafkaMessagingProvider.scala
@@ -68,19 +68,25 @@
         val nt = new NewTopic(topic, partitions, kafkaConfig.replicationFactor).configs(topicConfig.asJava)
 
         def createTopic(retries: Int = 5): Try[Unit] = {
-          Try(client.createTopics(List(nt).asJava).values().get(topic).get())
-            .map(_ => logging.info(this, s"created topic $topic"))
-            .recoverWith {
-              case CausedBy(_: TopicExistsException) =>
-                Success(logging.info(this, s"topic $topic already existed"))
-              case CausedBy(t: RetriableException) if retries > 0 =>
-                logging.warn(this, s"topic $topic could not be created because of $t, retries left: $retries")
-                Thread.sleep(1.second.toMillis)
-                createTopic(retries - 1)
-              case t =>
-                logging.error(this, s"ensureTopic for $topic failed due to $t")
-                Failure(t)
-            }
+          Try(client.listTopics().names().get())
+            .map(topics =>
+              if (topics.contains(topic)) {
+                Success(logging.info(this, s"$topic already exists and the user can see it, skipping creation."))
+              } else {
+                Try(client.createTopics(List(nt).asJava).values().get(topic).get())
+                  .map(_ => logging.info(this, s"created topic $topic"))
+                  .recoverWith {
+                    case CausedBy(_: TopicExistsException) =>
+                      Success(logging.info(this, s"topic $topic already existed"))
+                    case CausedBy(t: RetriableException) if retries > 0 =>
+                      logging.warn(this, s"topic $topic could not be created because of $t, retries left: $retries")
+                      Thread.sleep(1.second.toMillis)
+                      createTopic(retries - 1)
+                    case t =>
+                      logging.error(this, s"ensureTopic for $topic failed due to $t")
+                      Failure(t)
+                  }
+            })
         }
 
         val result = createTopic()