KAFKA-9046: Use top-level worker configs for connector admin clients

[Jira](https://issues.apache.org/jira/browse/KAFKA-9046)

The changes here are meant to find a healthy compromise between the pre- and post-KIP-458 functionality of Connect workers when configuring admin clients for use with DLQs. Before KIP-458, admin clients were configured using the top-level worker configs; after KIP-458, they are configured using worker configs with a prefix of `admin.` and then optionally overridden by connector configs with a prefix of `admin.override.`. The behavior proposed here is to use, in ascending order of precedence, the top-level worker configs, worker configs prefixed with `admin.`, and connector configs prefixed with `admin.override.`; essentially, use the pre-KIP-458 behavior by default but allow it to be overridden by the post-KIP-458 behavior.

Author: Chris Egerton <chrise@confluent.io>

Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>, Nigel Liang <nigel@nigelliang.com>

Closes #7525 from C0urante/kafka-9046

(cherry picked from commit 38d243b022336ecaf5cb400ae015c485f56ff978)
Signed-off-by: Manikumar Reddy <manikumar@confluent.io>
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index fbece79..814c750 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -607,8 +608,21 @@
                                             Class<? extends Connector> connectorClass,
                                             ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
         Map<String, Object> adminProps = new HashMap<>();
-        adminProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
-        // User-specified overrides
+        // Use the top-level worker configs to retain backwards compatibility with older releases which
+        // did not require a prefix for connector admin client configs in the worker configuration file
+        // Ignore configs that begin with "admin." since those will be added next (with the prefix stripped)
+        // and those that begin with "producer." and "consumer.", since we know they aren't intended for
+        // the admin client
+        Map<String, Object> nonPrefixedWorkerConfigs = config.originals().entrySet().stream()
+            .filter(e -> !e.getKey().startsWith("admin.")
+                && !e.getKey().startsWith("producer.")
+                && !e.getKey().startsWith("consumer."))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
+            Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+        adminProps.putAll(nonPrefixedWorkerConfigs);
+
+        // Admin client-specific overrides in the worker config
         adminProps.putAll(config.originalsWithPrefix("admin."));
 
         // Connector-specified overrides
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 36a9b66..7021503 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -1138,12 +1138,15 @@
         Map<String, String> props = new HashMap<>(workerProps);
         props.put("admin.client.id", "testid");
         props.put("admin.metadata.max.age.ms", "5000");
+        props.put("producer.bootstrap.servers", "cbeauho.com");
+        props.put("consumer.bootstrap.servers", "localhost:4761");
         WorkerConfig configWithOverrides = new StandaloneConfig(props);
 
         Map<String, Object> connConfig = new HashMap<String, Object>();
         connConfig.put("metadata.max.age.ms", "10000");
 
-        Map<String, String> expectedConfigs = new HashMap<>();
+        Map<String, String> expectedConfigs = new HashMap<>(workerProps);
+
         expectedConfigs.put("bootstrap.servers", "localhost:9092");
         expectedConfigs.put("client.id", "testid");
         expectedConfigs.put("metadata.max.age.ms", "10000");
@@ -1153,7 +1156,6 @@
         PowerMock.replayAll();
         assertEquals(expectedConfigs, Worker.adminConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
                                                              null, allConnectorClientConfigOverridePolicy));
-
     }
 
     @Test(expected = ConnectException.class)
@@ -1171,7 +1173,6 @@
         PowerMock.replayAll();
         Worker.adminConfigs(new ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
                                                           null, noneConnectorClientConfigOverridePolicy);
-
     }
 
     private void assertStatusMetrics(long expected, String metricName) {