Simplify templating in log messages (#6133)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 92c8620..eed4b90 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -92,7 +92,7 @@
             // Add Native brokers
             return pulsar().getLocalZkCache().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT);
         } catch (Exception e) {
-            LOG.error(String.format("[%s] Failed to get active broker list: cluster=%s", clientAppId(), cluster), e);
+            LOG.error("[{}] Failed to get active broker list: cluster={}", clientAppId(), cluster, e);
             throw new RestException(e);
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 1d15b00..b0afc73 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -65,8 +65,7 @@
                     .map(p -> p.backlog_quota_map.getOrDefault(BacklogQuotaType.destination_storage, defaultQuota))
                     .orElse(defaultQuota);
         } catch (Exception e) {
-            log.error(String.format("Failed to read policies data, will apply the default backlog quota: namespace=%s",
-                    namespace), e);
+            log.error("Failed to read policies data, will apply the default backlog quota: namespace={}", namespace, e);
             return this.defaultQuota;
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7c7637b..21a7d74 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1200,7 +1200,7 @@
         try {
             ownedByThisInstance = pulsar.getNamespaceService().isServiceUnitOwned(topicName);
         } catch (Exception e) {
-            log.debug(String.format("Failed to check the ownership of the topic: %s", topicName), e);
+            log.debug("Failed to check the ownership of the topic: {}", topicName, e);
             throw new RuntimeException(new ServerMetadataException(e));
         }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 166551c..6ad28af 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -437,7 +437,7 @@
             // propagate already wrapped-up WebApplicationExceptions
             throw wae;
         } catch (Exception oe) {
-            log.debug(String.format("Failed to find owner for namespace %s", fqnn), oe);
+            log.debug("Failed to find owner for namespace {}", fqnn, oe);
             throw new RestException(oe);
         }
     }
@@ -452,7 +452,7 @@
             // propagate already wrapped-up WebApplicationExceptions
             throw wae;
         } catch (Exception oe) {
-            log.debug(String.format("Failed to find owner for namespace %s", fqnn), oe);
+            log.debug("Failed to find owner for namespace {}", fqnn, oe);
             throw new RestException(oe);
         }
     }
@@ -544,11 +544,11 @@
             }
         } catch (IllegalArgumentException iae) {
             // namespace format is not valid
-            log.debug(String.format("Failed to find owner for ServiceUnit %s", bundle), iae);
+            log.debug("Failed to find owner for ServiceUnit {}", bundle, iae);
             throw new RestException(Status.PRECONDITION_FAILED,
                     "ServiceUnit format is not expected. ServiceUnit " + bundle);
         } catch (IllegalStateException ise) {
-            log.debug(String.format("Failed to find owner for ServiceUnit %s", bundle), ise);
+            log.debug("Failed to find owner for ServiceUnit {}", bundle, ise);
             throw new RestException(Status.PRECONDITION_FAILED, "ServiceUnit bundle is actived. ServiceUnit " + bundle);
         } catch (NullPointerException e) {
             log.warn("Unable to get web service url");
@@ -592,15 +592,15 @@
             }
         } catch (IllegalArgumentException iae) {
             // namespace format is not valid
-            log.debug(String.format("Failed to find owner for topic :%s", topicName), iae);
+            log.debug("Failed to find owner for topic: {}", topicName, iae);
             throw new RestException(Status.PRECONDITION_FAILED, "Can't find owner for topic " + topicName);
         } catch (IllegalStateException ise) {
-            log.debug(String.format("Failed to find owner for topic:%s", topicName), ise);
+            log.debug("Failed to find owner for topic: {}", topicName, ise);
             throw new RestException(Status.PRECONDITION_FAILED, "Can't find owner for topic " + topicName);
         } catch (WebApplicationException wae) {
             throw wae;
         } catch (Exception oe) {
-            log.debug(String.format("Failed to find owner for topic:%s", topicName), oe);
+            log.debug("Failed to find owner for topic: {}", topicName, oe);
             throw new RestException(oe);
         }
     }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
index 613d4cc..5d5715e 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
@@ -55,7 +55,7 @@
     private static RemovalListener<ClientConfigurationData, PulsarClientImpl> removalListener = notification -> {
         ClientConfigurationData config = notification.getKey();
         PulsarClientImpl client = notification.getValue();
-        LOG.debug("Evicting pulsar client %s with config %s, due to %s",
+        LOG.debug("Evicting pulsar client {} with config {}, due to {}",
             client.toString(), config.toString(), notification.getCause().toString());
         close(config, client);
     };
@@ -68,11 +68,9 @@
         PulsarClientImpl client;
         try {
             client = new PulsarClientImpl(clientConfig);
-            LOG.debug(String.format("Created a new instance of PulsarClientImpl for clientConf = %s",
-                clientConfig.toString()));
+            LOG.debug("Created a new instance of PulsarClientImpl for clientConf = {}", clientConfig.toString());
         } catch (PulsarClientException e) {
-            LOG.error(String.format("Failed to create PulsarClientImpl for clientConf = %s",
-                clientConfig.toString()));
+            LOG.error("Failed to create PulsarClientImpl for clientConf = {}", clientConfig.toString());
             throw e;
         }
         return client;
@@ -84,10 +82,10 @@
 
     private static void close(ClientConfigurationData clientConfig, PulsarClientImpl client) {
         try {
-            LOG.info(String.format("Closing the Pulsar client with conifg %s", clientConfig.toString()));
+            LOG.info("Closing the Pulsar client with config {}", clientConfig.toString());
             client.close();
         } catch (PulsarClientException e) {
-            LOG.warn(String.format("Error while closing the Pulsar client ", clientConfig.toString()), e);
+            LOG.warn("Error while closing the Pulsar client with config {}", clientConfig.toString(), e);
         }
     }
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
index 5f46365..f52aa41 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
@@ -192,7 +192,7 @@
      * @return the list of events to be processed as a part of the current window
      */
     private List<Event<T>> scanEvents(boolean fullScan) {
-        log.debug(String.format("Scan events, eviction policy %s", evictionPolicy));
+        log.debug("Scan events, eviction policy {}", evictionPolicy);
         List<Event<T>> eventsToExpire = new ArrayList<>();
         List<Event<T>> eventsToProcess = new ArrayList<>();
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 0ad04c3..538a572 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -75,13 +75,11 @@
         // if the dest directory does not exist, create it.
         if (dlogNamespace.logExists(destPkgPath)) {
             // if the destination file exists, write a log message
-            log.info(String.format("Target function file already exists at '%s'. Overwriting it now",
-                    destPkgPath));
+            log.info("Target function file already exists at '{}'. Overwriting it now", destPkgPath);
             dlogNamespace.deleteLog(destPkgPath);
         }
         // copy the topology package to target working directory
-        log.info(String.format("Uploading function package to '%s'",
-                destPkgPath));
+        log.info("Uploading function package to '{}'", destPkgPath);
 
         try (DistributedLogManager dlm = dlogNamespace.openLog(destPkgPath)) {
             try (AppendOnlyStreamWriter writer = dlm.getAppendOnlyStreamWriter()){
diff --git a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java
index 36bbe55..7b39714 100644
--- a/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java
+++ b/pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java
@@ -109,8 +109,8 @@
                 for (String channelName : channelNames) {
                     ChannelComponent channelComponent = channelComponentMap.get(channelName);
                     if (channelComponent.components.isEmpty()) {
-                        LOGGER.warn(String.format("Channel %s has no components connected" +
-                                " and has been removed.", channelName));
+                        LOGGER.warn("Channel {} has no components connected" +
+                                " and has been removed.", channelName);
                         channelComponentMap.remove(channelName);
                         Map<String, Channel> nameChannelMap =
                                 channelCache.get(channelComponent.channel.getClass());
@@ -118,8 +118,8 @@
                             nameChannelMap.remove(channelName);
                         }
                     } else {
-                        LOGGER.info(String.format("Channel %s connected to %s",
-                                channelName, channelComponent.components.toString()));
+                        LOGGER.info("Channel {} connected to {}",
+                                channelName, channelComponent.components.toString());
                         conf.addChannel(channelName, channelComponent.channel);
                     }
                 }