KAFKA-13328, KAFKA-13329 (1): Add preflight validations for key, value, and header converter classes (#14304)

Reviewers: Yash Mayya <yash.mayya@gmail.com>, Greg Harris <greg.harris@aiven.io>
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 6e1c3ce..924b979 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.utils;
 
+import java.lang.reflect.Modifier;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteOrder;
 import java.nio.file.StandardOpenOption;
@@ -1109,6 +1110,15 @@
     }
 
     /**
+     * Closes {@code maybeCloseable} if it implements the {@link AutoCloseable} interface,
+     * and if an exception is thrown, it is logged at the WARN level.
+     */
+    public static void maybeCloseQuietly(Object maybeCloseable, String name) {
+        if (maybeCloseable instanceof AutoCloseable)
+            closeQuietly((AutoCloseable) maybeCloseable, name);
+    }
+
+    /**
      * Closes {@code closeable} and if an exception is thrown, it is logged at the WARN level.
      * <b>Be cautious when passing method references as an argument.</b> For example:
      * <p>
@@ -1577,6 +1587,38 @@
     }
 
     /**
+     * Ensure that the class is concrete (i.e., not abstract), and that it subclasses a given base class.
+     * If it is abstract or does not subclass the given base class, throw a {@link ConfigException}
+     * with a friendly error message suggesting a list of concrete child subclasses (if any are known).
+     * @param baseClass the expected superclass; may not be null
+     * @param klass the class to check; may not be null
+     * @throws ConfigException if the class is not concrete
+     */
+    public static void ensureConcreteSubclass(Class<?> baseClass, Class<?> klass) {
+        Objects.requireNonNull(baseClass);
+        Objects.requireNonNull(klass);
+
+        if (!baseClass.isAssignableFrom(klass)) {
+            String inheritFrom = baseClass.isInterface() ? "implement" : "extend";
+            String baseClassType = baseClass.isInterface() ? "interface" : "class";
+            throw new ConfigException("Class " + klass + " does not " + inheritFrom + " the " + baseClass.getSimpleName() + " " + baseClassType);
+        }
+
+        if (Modifier.isAbstract(klass.getModifiers())) {
+            String childClassNames = Stream.of(klass.getClasses())
+                    .filter(baseClass::isAssignableFrom)
+                    .filter(c -> !Modifier.isAbstract(c.getModifiers()))
+                    .filter(c -> Modifier.isPublic(c.getModifiers()))
+                    .map(Class::getName)
+                    .collect(Collectors.joining(", "));
+            String message = "This class is abstract and cannot be created.";
+            if (!Utils.isBlank(childClassNames))
+                message += " Did you mean " + childClassNames + "?";
+            throw new ConfigException(message);
+        }
+    }
+
+    /**
      * Convert time instant to readable string for logging
      * @param timestamp the timestamp of the instant to be converted.
      *
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 4f20a32..f33f00e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -28,12 +28,15 @@
 import org.apache.kafka.connect.runtime.errors.ToleranceType;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
+import org.apache.kafka.connect.util.ConcreteSubClassValidator;
+import org.apache.kafka.connect.util.InstantiableClassValidator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -42,7 +45,6 @@
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
@@ -82,10 +84,18 @@
     public static final String KEY_CONVERTER_CLASS_CONFIG = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
     public static final String KEY_CONVERTER_CLASS_DOC = WorkerConfig.KEY_CONVERTER_CLASS_DOC;
     public static final String KEY_CONVERTER_CLASS_DISPLAY = "Key converter class";
+    private static final ConfigDef.Validator KEY_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
+            ConcreteSubClassValidator.forSuperClass(Converter.class),
+            new InstantiableClassValidator()
+    );
 
     public static final String VALUE_CONVERTER_CLASS_CONFIG = WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
     public static final String VALUE_CONVERTER_CLASS_DOC = WorkerConfig.VALUE_CONVERTER_CLASS_DOC;
     public static final String VALUE_CONVERTER_CLASS_DISPLAY = "Value converter class";
+    private static final ConfigDef.Validator VALUE_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
+            ConcreteSubClassValidator.forSuperClass(Converter.class),
+            new InstantiableClassValidator()
+    );
 
     public static final String HEADER_CONVERTER_CLASS_CONFIG = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
     public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC;
@@ -93,6 +103,10 @@
     // The Connector config should not have a default for the header converter, since the absence of a config property means that
     // the worker config settings should be used. Thus, we set the default to null here.
     public static final String HEADER_CONVERTER_CLASS_DEFAULT = null;
+    private static final ConfigDef.Validator HEADER_CONVERTER_CLASS_VALIDATOR = ConfigDef.CompositeValidator.of(
+            ConcreteSubClassValidator.forSuperClass(HeaderConverter.class),
+            new InstantiableClassValidator()
+    );
 
     public static final String TASKS_MAX_CONFIG = "tasks.max";
     private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
@@ -181,9 +195,9 @@
                 .define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, NAME_DISPLAY)
                 .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.LONG, CONNECTOR_CLASS_DISPLAY)
                 .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, TASK_MAX_DISPLAY)
-                .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
-                .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
-                .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
+                .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, KEY_CONVERTER_CLASS_VALIDATOR, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
+                .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, VALUE_CONVERTER_CLASS_VALIDATOR, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
+                .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, HEADER_CONVERTER_CLASS_VALIDATOR, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
                 .define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("transformation"), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
                 .define(PREDICATES_CONFIG, Type.LIST, Collections.emptyList(), aliasValidator("predicate"), Importance.LOW, PREDICATES_DOC, PREDICATES_GROUP, ++orderInGroup, Width.LONG, PREDICATES_DISPLAY)
                 .define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, CONFIG_RELOAD_ACTION_RESTART,
@@ -491,21 +505,11 @@
          * @param cls The subclass of the baseclass.
          */
         ConfigDef getConfigDefFromConfigProvidingClass(String key, Class<?> cls) {
-            if (cls == null || !baseClass.isAssignableFrom(cls)) {
-                throw new ConfigException(key, String.valueOf(cls), "Not a " + baseClass.getSimpleName());
+            if (cls == null) {
+                throw new ConfigException(key, null, "Not a " + baseClass.getSimpleName());
             }
-            if (Modifier.isAbstract(cls.getModifiers())) {
-                String childClassNames = Stream.of(cls.getClasses())
-                        .filter(cls::isAssignableFrom)
-                        .filter(c -> !Modifier.isAbstract(c.getModifiers()))
-                        .filter(c -> Modifier.isPublic(c.getModifiers()))
-                        .map(Class::getName)
-                        .collect(Collectors.joining(", "));
-                String message = Utils.isBlank(childClassNames) ?
-                        aliasKind + " is abstract and cannot be created." :
-                        aliasKind + " is abstract and cannot be created. Did you mean " + childClassNames + "?";
-                throw new ConfigException(key, String.valueOf(cls), message);
-            }
+            Utils.ensureConcreteSubclass(baseClass, cls);
+
             T transformation;
             try {
                 transformation = Utils.newInstance(cls, baseClass);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConcreteSubClassValidator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConcreteSubClassValidator.java
new file mode 100644
index 0000000..cbb4ded
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConcreteSubClassValidator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.Utils;
+
+public class ConcreteSubClassValidator implements ConfigDef.Validator {
+    private final Class<?> expectedSuperClass;
+
+    private ConcreteSubClassValidator(Class<?> expectedSuperClass) {
+        this.expectedSuperClass = expectedSuperClass;
+    }
+
+    public static ConcreteSubClassValidator forSuperClass(Class<?> expectedSuperClass) {
+        return new ConcreteSubClassValidator(expectedSuperClass);
+    }
+
+    @Override
+    public void ensureValid(String name, Object value) {
+        if (value == null) {
+            // The value will be null if the class couldn't be found; no point in performing follow-up validation
+            return;
+        }
+
+        Class<?> cls = (Class<?>) value;
+        Utils.ensureConcreteSubclass(expectedSuperClass, cls);
+    }
+
+    @Override
+    public String toString() {
+        return "A concrete subclass of " + expectedSuperClass.getName();
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java
new file mode 100644
index 0000000..796be4e
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/InstantiableClassValidator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
+
+public class InstantiableClassValidator implements ConfigDef.Validator {
+
+    @Override
+    public void ensureValid(String name, Object value) {
+        if (value == null) {
+            // The value will be null if the class couldn't be found; no point in performing follow-up validation
+            return;
+        }
+
+        Class<?> cls = (Class<?>) value;
+        try {
+            Object o = cls.getDeclaredConstructor().newInstance();
+            Utils.maybeCloseQuietly(o, o + " (instantiated for preflight validation");
+        } catch (NoSuchMethodException | IllegalAccessException e) {
+            throw new ConfigException(name, cls.getName(), "Could not find a public no-argument constructor for class" + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+        } catch (ReflectiveOperationException | LinkageError | RuntimeException e) {
+            throw new ConfigException(name, cls.getName(), "Could not instantiate class" + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "A class with a public, no-argument constructor";
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
index 8864611..b3e37c9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorValidationIntegrationTest.java
@@ -16,7 +16,13 @@
  */
 package org.apache.kafka.connect.integration;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.StringConverter;
 import org.apache.kafka.connect.transforms.Filter;
 import org.apache.kafka.connect.transforms.predicates.RecordIsTombstone;
@@ -27,6 +33,7 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -297,6 +304,55 @@
     }
 
     @Test
+    public void testConnectorHasInvalidConverterClassType() throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(KEY_CONVERTER_CLASS_CONFIG, MonitorableSinkConnector.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a converter with a class of the wrong type is specified",
+                0
+        );
+    }
+
+    @Test
+    public void testConnectorHasAbstractConverter() throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(KEY_CONVERTER_CLASS_CONFIG, AbstractTestConverter.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when an abstract converter class is specified"
+        );
+    }
+
+    @Test
+    public void testConnectorHasConverterWithNoSuitableConstructor() throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithPrivateConstructor.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a converter class with no suitable constructor is specified"
+        );
+    }
+
+    @Test
+    public void testConnectorHasConverterThatThrowsExceptionOnInstantiation() throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(KEY_CONVERTER_CLASS_CONFIG, TestConverterWithConstructorThatThrowsException.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a converter class that throws an exception on instantiation is specified"
+        );
+    }
+
+    @Test
     public void testConnectorHasMissingHeaderConverterClass() throws InterruptedException {
         Map<String, String> config = defaultSinkConnectorProps();
         config.put(HEADER_CONVERTER_CLASS_CONFIG, "WheresTheFruit");
@@ -309,6 +365,111 @@
         );
     }
 
+    @Test
+    public void testConnectorHasInvalidHeaderConverterClassType() throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(HEADER_CONVERTER_CLASS_CONFIG, MonitorableSinkConnector.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a header converter with a class of the wrong type is specified"
+        );
+    }
+
+    @Test
+    public void testConnectorHasAbstractHeaderConverter() throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(HEADER_CONVERTER_CLASS_CONFIG, AbstractTestConverter.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when an abstract header converter class is specified"
+        );
+    }
+
+    @Test
+    public void testConnectorHasHeaderConverterWithNoSuitableConstructor() throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(HEADER_CONVERTER_CLASS_CONFIG, TestConverterWithPrivateConstructor.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a header converter class with no suitable constructor is specified"
+        );
+    }
+
+    @Test
+    public void testConnectorHasHeaderConverterThatThrowsExceptionOnInstantiation() throws InterruptedException {
+        Map<String, String> config = defaultSinkConnectorProps();
+        config.put(HEADER_CONVERTER_CLASS_CONFIG, TestConverterWithConstructorThatThrowsException.class.getName());
+        connect.assertions().assertExactlyNumErrorsOnConnectorConfigValidation(
+                config.get(CONNECTOR_CLASS_CONFIG),
+                config,
+                1,
+                "Connector config should fail preflight validation when a header converter class that throws an exception on instantiation is specified"
+        );
+    }
+
+    public static abstract class TestConverter implements Converter, HeaderConverter {
+
+        // Defined by both Converter and HeaderConverter interfaces
+        @Override
+        public ConfigDef config() {
+            return null;
+        }
+
+        // Defined by Converter interface
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+        }
+
+        @Override
+        public byte[] fromConnectData(String topic, Schema schema, Object value) {
+            return null;
+        }
+
+        @Override
+        public SchemaAndValue toConnectData(String topic, byte[] value) {
+            return null;
+        }
+
+        // Defined by HeaderConverter interface
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+        }
+
+        @Override
+        public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
+            return null;
+        }
+
+        @Override
+        public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
+            return new byte[0];
+        }
+    }
+
+    public static abstract class AbstractTestConverter extends TestConverter {
+    }
+
+    public static class TestConverterWithPrivateConstructor extends TestConverter {
+        private TestConverterWithPrivateConstructor() {
+        }
+    }
+
+    public static class TestConverterWithConstructorThatThrowsException extends TestConverter {
+        public TestConverterWithConstructorThatThrowsException() {
+            throw new ConnectException("whoops");
+        }
+    }
+
     private Map<String, String> defaultSourceConnectorProps() {
         // setup up props for the source connector
         Map<String, String> props = new HashMap<>();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index c113039..f4e890f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -198,13 +198,10 @@
         props.put("connector.class", TestConnector.class.getName());
         props.put("transforms", "a");
         props.put("transforms.a.type", AbstractTransformation.class.getName());
-        try {
-            new ConnectorConfig(MOCK_PLUGINS, props);
-        } catch (ConfigException ex) {
-            assertTrue(
-                ex.getMessage().contains("Transformation is abstract and cannot be created.")
-            );
-        }
+        ConfigException ex = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
+        assertTrue(
+            ex.getMessage().contains("This class is abstract and cannot be created.")
+        );
     }
     @Test
     public void abstractKeyValueTransform() {
@@ -213,19 +210,16 @@
         props.put("connector.class", TestConnector.class.getName());
         props.put("transforms", "a");
         props.put("transforms.a.type", AbstractKeyValueTransformation.class.getName());
-        try {
-            new ConnectorConfig(MOCK_PLUGINS, props);
-        } catch (ConfigException ex) {
-            assertTrue(
-                ex.getMessage().contains("Transformation is abstract and cannot be created.")
-            );
-            assertTrue(
-                ex.getMessage().contains(AbstractKeyValueTransformation.Key.class.getName())
-            );
-            assertTrue(
-                ex.getMessage().contains(AbstractKeyValueTransformation.Value.class.getName())
-            );
-        }
+        ConfigException ex = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
+        assertTrue(
+            ex.getMessage().contains("This class is abstract and cannot be created.")
+        );
+        assertTrue(
+            ex.getMessage().contains(AbstractKeyValueTransformation.Key.class.getName())
+        );
+        assertTrue(
+            ex.getMessage().contains(AbstractKeyValueTransformation.Value.class.getName())
+        );
     }
 
     @Test
@@ -240,7 +234,7 @@
         props.put("predicates", "my-pred");
         props.put("predicates.my-pred.type", TestConnector.class.getName());
         ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
-        assertTrue(e.getMessage().contains("Not a Predicate"));
+        assertEquals("Class " + TestConnector.class + " does not implement the Predicate interface", e.getMessage());
     }
 
     @Test
@@ -287,7 +281,7 @@
         props.put("predicates.my-pred.type", AbstractTestPredicate.class.getName());
         props.put("predicates.my-pred.int", "84");
         ConfigException e = assertThrows(ConfigException.class, () -> new ConnectorConfig(MOCK_PLUGINS, props));
-        assertTrue(e.getMessage().contains("Predicate is abstract and cannot be created"));
+        assertTrue(e.getMessage().contains("This class is abstract and cannot be created"));
     }
 
     private void assertTransformationStageWithPredicate(Map<String, String> props, boolean expectedNegated) {