[ISSUE #230]Optimize plugin loading cloass loader, and support rest API to get connector plugin list (#232)

* Fix debezium demecial type conversion problem #190

* Upgrade rocketmq-replicator API to v0.1.3 #189

* Encountered change event for table databasename.tablename whose schema isn`t known to this connector #191

* Debezium mysql source connector delete event causes null pointer #196

* remove local config

* Debezium mysql source connector delete event causes null pointer #196

* Rocketmq replicator running null pointer #205

* Optimize plugin loading cloass loader, and support rest API to get connector plugin list #230

* remove local config

* fixed

* remove invalid references

* fixed

* move getClass method to Utils
diff --git a/rocketmq-connect-runtime/pom.xml b/rocketmq-connect-runtime/pom.xml
index 6571673..bd31d7c 100644
--- a/rocketmq-connect-runtime/pom.xml
+++ b/rocketmq-connect-runtime/pom.xml
@@ -248,5 +248,11 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-artifact</artifactId>
+            <version>3.8.1</version>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
index 3e958d5..6fdb26f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
@@ -129,7 +129,6 @@
                 }
             }
             Plugin plugin = new Plugin(pluginPaths);
-            plugin.initPlugin();
 
             // Create controller and initialize.
             ClusterManagementService clusterManagementService = ServiceProviderUtil.getClusterManagementServices(StagingMode.DISTRIBUTED);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
index c7adad2..a214ff7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
@@ -128,7 +128,6 @@
                 }
             }
             Plugin plugin = new Plugin(pluginPaths);
-            plugin.initPlugin();
 
             ClusterManagementService clusterManagementService = ServiceProviderUtil.getClusterManagementServices(StagingMode.STANDALONE);
             clusterManagementService.initialize(connectConfig);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
index f64b9e9..3e5960e 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
@@ -18,6 +18,10 @@
 package org.apache.rocketmq.connect.runtime.common;
 
 import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
+
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Set;
@@ -125,6 +129,14 @@
         this.properties = properties;
     }
 
+
+    /**
+     * Gets all original settings with the given prefix.
+     */
+    public Map<String, String> originalsWithPrefix(String prefix) {
+        return originalsWithPrefix(prefix, true);
+    }
+
     /**
      * Gets all original settings with the given prefix.
      * @param prefix the prefix to use as a filter
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
index ef3d264..f1e8d7a 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
@@ -22,6 +22,7 @@
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.Transform;
 import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
 import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -31,6 +32,7 @@
 import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
 import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +40,10 @@
 import java.util.List;
 import java.util.Set;
 
+/**
+ * Transform serial actuator, including the initialization of transform
+ * @param <R>
+ */
 public class TransformChain<R extends ConnectRecord> implements AutoCloseable {
 
     private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
@@ -83,7 +89,7 @@
             String transformClassKey = PREFIX + transformStr + "-class";
             String transformClass = config.getString(transformClassKey);
             try {
-                Transform transform = getTransform(transformClass);
+                Transform transform = newTransform(transformClass);
                 KeyValue transformConfig = new DefaultKeyValue();
                 Set<String> configKeys = config.keySet();
                 for (String key : configKeys) {
@@ -121,24 +127,21 @@
         return connectRecord;
     }
 
-    private Transform getTransform(String transformClass) throws Exception {
-        ClassLoader loader = plugin.getPluginClassLoader(transformClass);
-        final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
-        Class transformClazz;
-        boolean isolationFlag = false;
-        if (loader instanceof PluginClassLoader) {
-            transformClazz = ((PluginClassLoader) loader).loadClass(transformClass, false);
-            isolationFlag = true;
-        } else {
-            transformClazz = Class.forName(transformClass);
-        }
-        final Transform transform = (Transform) transformClazz.getDeclaredConstructor().newInstance();
-        if (isolationFlag) {
-            Plugin.compareAndSwapLoaders(loader);
-        }
 
-        Plugin.compareAndSwapLoaders(currentThreadLoader);
-        return transform;
+
+    private Transform newTransform(String transformClass) throws Exception {
+        ClassLoader savedLoader = plugin.currentThreadLoader();
+        try {
+            ClassLoader loader = plugin.delegatingLoader().pluginClassLoader(transformClass);
+            savedLoader = Plugin.compareAndSwapLoaders(loader);
+            Class transformClazz = Utils.getContextCurrentClassLoader().loadClass(transformClass);
+            final Transform transform = (Transform) transformClazz.getDeclaredConstructor().newInstance();
+            return transform;
+        } catch (Exception ex) {
+            throw new ConnectException("Load transform failed !!", ex);
+        }finally {
+            Plugin.compareAndSwapLoaders(savedLoader);
+        }
     }
 
     /**
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index ca34ca7..369ec9d 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -26,6 +26,7 @@
 import io.openmessaging.connector.api.component.task.source.SourceTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.RecordConverter;
+import io.openmessaging.connector.api.errors.ConnectException;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -48,6 +49,7 @@
 import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
 import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
 import org.apache.rocketmq.connect.runtime.utils.ServiceThread;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -183,31 +185,27 @@
         }
 
         for (String connectorName : newConnectors.keySet()) {
+            ClassLoader savedLoader = plugin.currentThreadLoader();
             try {
                 ConnectKeyValue keyValue = newConnectors.get(connectorName);
                 String connectorClass = keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
-                ClassLoader loader = plugin.getPluginClassLoader(connectorClass);
-                final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
-                Class clazz;
-                boolean isolationFlag = false;
-                if (loader instanceof PluginClassLoader) {
-                    clazz = ((PluginClassLoader) loader).loadClass(connectorClass, false);
-                    isolationFlag = true;
-                } else {
-                    clazz = Class.forName(connectorClass);
-                }
-                final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
+                ClassLoader connectorLoader = plugin.delegatingLoader().pluginClassLoader(connectorClass);
+                savedLoader = Plugin.compareAndSwapLoaders(connectorLoader);
+
+                // instance connector
+                final Connector connector = plugin.newConnector(connectorClass);
                 WorkerConnector workerConnector = new WorkerConnector(connectorName, connector, connectorConfigs.get(connectorName), new DefaultConnectorContext(connectorName, connectController));
-                if (isolationFlag) {
-                    Plugin.compareAndSwapLoaders(loader);
-                }
+
+                // start connector
                 workerConnector.initialize();
                 workerConnector.start();
                 log.info("Connector {} start", workerConnector.getConnectorName());
-                Plugin.compareAndSwapLoaders(currentThreadLoader);
                 this.workingConnectors.add(workerConnector);
             } catch (Exception e) {
                 log.error("worker connector start exception. workerName: " + connectorName, e);
+            } finally {
+                // compare and swap
+                Plugin.compareAndSwapLoaders(savedLoader);
             }
         }
     }
@@ -555,34 +553,32 @@
 
                 ClassLoader savedLoader = plugin.currentThreadLoader();
                 try {
-
                     String connType = keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
-                    ClassLoader connectorLoader = plugin.getPluginClassLoader(connType);
+                    ClassLoader connectorLoader = plugin.delegatingLoader().connectorLoader(connType);
                     savedLoader = Plugin.compareAndSwapLoaders(connectorLoader);
-
+                    // new task
                     final Class<? extends Task> taskClass = plugin.currentThreadLoader().loadClass(keyValue.getString(RuntimeConfigDefine.TASK_CLASS)).asSubclass(Task.class);
-
                     final Task task = plugin.newTask(taskClass);
 
-                    final String valueConverterClazzName = keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, RuntimeConfigDefine.SOURCE_RECORD_CONVERTER_DEFAULT);
-                    final String keyConverterClazzName = keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER_DEFAULT);
-                    // new stance
-                    RecordConverter valueConverter = Class.forName(valueConverterClazzName).asSubclass(RecordConverter.class).getDeclaredConstructor().newInstance();
-                    RecordConverter keyConverter = Class.forName(keyConverterClazzName).asSubclass(RecordConverter.class).getDeclaredConstructor().newInstance();
+                    /**
+                     * create key/value converter
+                     */
+                    RecordConverter valueConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, Plugin.ClassLoaderUsage.CURRENT_CLASSLOADER);
+                    RecordConverter keyConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, Plugin.ClassLoaderUsage.CURRENT_CLASSLOADER);
 
-                    //value config
-                    Map<String, String> valueConverterConfig = keyValue.originalsWithPrefix(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, true);
-                    valueConverterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
-                    valueConverter.configure(valueConverterConfig);
+                    if (keyConverter == null) {
+                        keyConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, Plugin.ClassLoaderUsage.PLUGINS);
+                        log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id);
+                    } else {
+                        log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id);
+                    }
+                    if (valueConverter == null) {
+                        valueConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, Plugin.ClassLoaderUsage.PLUGINS);
+                        log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id);
+                    } else {
+                        log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id);
+                    }
 
-                    //key config
-                    Map<String, String> keyConverterConfig = keyValue.originalsWithPrefix(RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, true);
-                    keyConverterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.KEY.getName());
-                    keyConverter.configure(keyConverterConfig);
-
-//                    if (isolationFlag) {
-//                        Plugin.compareAndSwapLoaders(loader);
-//                    }
                     if (task instanceof SourceTask) {
                         DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(workerConfig);
                         TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
@@ -622,6 +618,7 @@
                     Plugin.compareAndSwapLoaders(savedLoader);
                 } catch (Exception e) {
                     log.error("start worker task exception. config {}" + JSON.toJSONString(keyValue), e);
+                    Plugin.compareAndSwapLoaders(savedLoader);
                 }
             }
         }
@@ -682,23 +679,23 @@
         this.pendingTasks.put(workerDirectTask, System.currentTimeMillis());
     }
 
-    private Task getTask(String taskClass) throws Exception {
-        ClassLoader loader = plugin.getPluginClassLoader(taskClass);
-        final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
-        Class taskClazz;
-        boolean isolationFlag = false;
-        if (loader instanceof PluginClassLoader) {
-            taskClazz = ((PluginClassLoader) loader).loadClass(taskClass, false);
-            isolationFlag = true;
-        } else {
-            taskClazz = Class.forName(taskClass);
+    private Task getTask(String taskClass){
+        ClassLoader savedLoader = plugin.currentThreadLoader();
+        Task task = null;
+        try {
+            // Get plugin loader
+            ClassLoader taskLoader = plugin.delegatingLoader().pluginClassLoader(taskClass);
+            // Compare and set current loader
+            savedLoader = Plugin.compareAndSwapLoaders(taskLoader);
+            // load class
+            Class taskClazz = Utils.getContextCurrentClassLoader().loadClass(taskClass).asSubclass(Task.class);
+            // new task
+            task = plugin.newTask(taskClazz);
+        } catch (Exception ex ){
+            throw new ConnectException("Create direct task failure", ex);
+        } finally {
+            Plugin.compareAndSwapLoaders(savedLoader);
         }
-        final Task task = (Task) taskClazz.getDeclaredConstructor().newInstance();
-        if (isolationFlag) {
-            Plugin.compareAndSwapLoaders(loader);
-        }
-
-        Plugin.compareAndSwapLoaders(currentThreadLoader);
         return task;
     }
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
index 7ca5a43..3bb6818 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
@@ -161,4 +161,7 @@
         return connectStatsService;
     }
 
+    public Plugin plugin() {
+        return this.plugin;
+    }
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/DelegatingClassLoader.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/DelegatingClassLoader.java
new file mode 100644
index 0000000..6153222
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/DelegatingClassLoader.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.controller.isolation;
+
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.component.connector.Connector;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.RecordConverter;
+import org.reflections.Configuration;
+import org.reflections.Reflections;
+import org.reflections.ReflectionsException;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.InvalidPathException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Driver;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Delegating class Loader
+ */
+public class DelegatingClassLoader extends URLClassLoader {
+    private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
+    private static final String CLASSPATH_NAME = "classpath";
+    private static final String UNDEFINED_VERSION = "undefined";
+
+    private final Map<String, SortedMap<PluginWrapper<?>, ClassLoader>> pluginLoaders;
+    private final Map<String, String> aliases;
+    private final SortedSet<PluginWrapper<SinkConnector>> sinkConnectors;
+    private final SortedSet<PluginWrapper<SourceConnector>> sourceConnectors;
+    private final Collection<PluginWrapper<SourceTask>> sourceTasks;
+    private final Collection<PluginWrapper<SinkTask>> sinkTasks;
+    private final SortedSet<PluginWrapper<RecordConverter>> converters;
+    private final SortedSet<PluginWrapper<Transform<?>>> transformations;
+    private final List<String> pluginPaths;
+
+
+    public DelegatingClassLoader(List<String> pluginPaths, ClassLoader parent) {
+        super(new URL[0], parent);
+        this.pluginPaths = pluginPaths;
+        this.pluginLoaders = new HashMap<>();
+        this.aliases = new HashMap<>();
+        this.sinkConnectors = new TreeSet<>();
+        this.sourceConnectors = new TreeSet<>();
+        this.sourceTasks = new TreeSet<>();
+        this.sinkTasks = new TreeSet<>();
+        this.converters = new TreeSet<>();
+        this.transformations = new TreeSet<>();
+    }
+
+    public DelegatingClassLoader(List<String> pluginPaths) {
+        this(pluginPaths, DelegatingClassLoader.class.getClassLoader());
+    }
+
+    public Set<PluginWrapper<Connector>> connectors() {
+        Set<PluginWrapper<Connector>> connectors = new TreeSet<>((Set) sinkConnectors);
+        connectors.addAll((Set) sourceConnectors);
+        return connectors;
+    }
+
+    public Set<PluginWrapper<SinkConnector>> sinkConnectors() {
+        return sinkConnectors;
+    }
+
+    public Set<PluginWrapper<SourceConnector>> sourceConnectors() {
+        return sourceConnectors;
+    }
+
+    public Set<PluginWrapper<RecordConverter>> converters() {
+        return converters;
+    }
+
+    public Set<PluginWrapper<Transform<?>>> transformations() {
+        return transformations;
+    }
+
+    /**
+     * Retrieve the PluginClassLoader associated with a plugin class
+     * @param name
+     * @return
+     */
+    public PluginClassLoader pluginClassLoader(String name) {
+//        if (!PluginUtils.shouldLoadInIsolation(name)) {
+//            return null;
+//        }
+        SortedMap<PluginWrapper<?>, ClassLoader> inner = pluginLoaders.get(name);
+        if (inner == null) {
+            return null;
+        }
+        ClassLoader pluginLoader = inner.get(inner.lastKey());
+        return pluginLoader instanceof PluginClassLoader
+               ? (PluginClassLoader) pluginLoader
+               : null;
+    }
+
+    public ClassLoader connectorLoader(Connector connector) {
+        return connectorLoader(connector.getClass().getName());
+    }
+
+    public ClassLoader connectorLoader(String connectorClassOrAlias) {
+        String fullName = aliases.containsKey(connectorClassOrAlias)
+                          ? aliases.get(connectorClassOrAlias)
+                          : connectorClassOrAlias;
+        ClassLoader classLoader = pluginClassLoader(fullName);
+        if (classLoader == null) {
+            classLoader = this;
+        }
+        log.debug(
+            "Getting plugin class loader: '{}' for connector: {}",
+            classLoader,
+            connectorClassOrAlias
+        );
+        return classLoader;
+    }
+
+    private static PluginClassLoader newPluginClassLoader(
+            final URL pluginLocation,
+            final URL[] urls,
+            final ClassLoader parent
+    ) {
+        return AccessController.doPrivileged(
+                (PrivilegedAction<PluginClassLoader>) () -> new PluginClassLoader(pluginLocation, urls, parent)
+        );
+    }
+
+    private <T> void addPlugins(Collection<PluginWrapper<T>> plugins, ClassLoader loader) {
+        for (PluginWrapper<T> plugin : plugins) {
+            String pluginClassName = plugin.className();
+            SortedMap<PluginWrapper<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
+            if (inner == null) {
+                inner = new TreeMap<>();
+                pluginLoaders.put(pluginClassName, inner);
+                log.info("Added plugin '{}'", pluginClassName);
+            }
+            inner.put(plugin, loader);
+        }
+    }
+
+    public void initLoaders() {
+        long begin_time = System.currentTimeMillis();
+        for (String configPath : pluginPaths) {
+            initPluginLoader(configPath);
+        }
+        initPluginLoader(CLASSPATH_NAME);
+        addAllAliases();
+        log.info("Init all plugins cost time = {} ms", System.currentTimeMillis()-begin_time);
+    }
+
+    private void initPluginLoader(String path) {
+        try {
+            if (CLASSPATH_NAME.equals(path)) {
+                scanUrlsAndAddPlugins(
+                        getParent(),
+                        ClasspathHelper.forJavaClassPath().toArray(new URL[0])
+                );
+            } else {
+                Path pluginPath = Paths.get(path).toAbsolutePath();
+                path = pluginPath.toString();
+                if (Files.isDirectory(pluginPath)) {
+                    for (Path pluginLocation : PluginUtils.pluginLocations(pluginPath)) {
+                        registerPlugin(pluginLocation);
+                    }
+                } else if (PluginUtils.isArchive(pluginPath)) {
+                    registerPlugin(pluginPath);
+                }
+            }
+        } catch (InvalidPathException | MalformedURLException e) {
+            log.error("Invalid path in plugin path: {}. Ignoring.", path, e);
+        } catch (IOException e) {
+            log.error("Could not get listing for plugin path: {}. Ignoring.", path, e);
+        } catch (ReflectiveOperationException e) {
+            log.error("Could not instantiate plugins in: {}. Ignoring.", path, e);
+        }
+    }
+
+    private void registerPlugin(Path pluginLocation)
+            throws ReflectiveOperationException, IOException {
+        log.info("Loading plugin from: {}", pluginLocation);
+        List<URL> pluginUrls = new ArrayList<>();
+        for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
+            pluginUrls.add(path.toUri().toURL());
+        }
+        URL[] urls = pluginUrls.toArray(new URL[0]);
+        if (log.isDebugEnabled()) {
+            log.debug("Loading plugin urls: {}", Arrays.toString(urls));
+        }
+        PluginClassLoader loader = newPluginClassLoader(
+                pluginLocation.toUri().toURL(),
+                urls,
+                this
+        );
+        scanUrlsAndAddPlugins(loader, urls);
+    }
+
+    private void scanUrlsAndAddPlugins(
+            ClassLoader loader,
+            URL[] urls
+    ) throws ReflectiveOperationException {
+        long begin_time = System.currentTimeMillis();
+        PluginScanResult plugins = doLoad(loader, urls);
+        log.info("Registered loader: {}, cost time = {} ms", loader, System.currentTimeMillis()-begin_time);
+        if (!plugins.isEmpty()) {
+            addPlugins(plugins.sinkConnectors(), loader);
+            sinkConnectors.addAll(plugins.sinkConnectors());
+            addPlugins(plugins.sourceConnectors(), loader);
+            sourceConnectors.addAll(plugins.sourceConnectors());
+            addPlugins(plugins.sourceTasks(), loader);
+            sourceTasks.addAll(plugins.sourceTasks());
+            addPlugins(plugins.sinkTasks(), loader);
+            sinkTasks.addAll(plugins.sinkTasks());
+            addPlugins(plugins.converters(), loader);
+            converters.addAll(plugins.converters());
+            addPlugins(plugins.transformations(), loader);
+            transformations.addAll(plugins.transformations());
+        }
+        loadJdbcDrivers(loader);
+    }
+
+    private void loadJdbcDrivers(final ClassLoader loader) {
+        AccessController.doPrivileged(
+                new PrivilegedAction<Void>() {
+                    @Override
+                    public Void run() {
+                        ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
+                                Driver.class,
+                                loader
+                        );
+                        Iterator<Driver> driversIterator = loadedDrivers.iterator();
+                        try {
+                            while (driversIterator.hasNext()) {
+                                Driver driver = driversIterator.next();
+                                log.debug(
+                                        "Registered java.sql.Driver: {} to java.sql.DriverManager",
+                                        driver
+                                );
+                            }
+                        } catch (Throwable t) {
+                            log.debug(
+                                    "Ignoring java.sql.Driver classes listed in resources but not"
+                                            + " present in class loader's classpath: ",
+                                    t
+                            );
+                        }
+                        return null;
+                    }
+                }
+        );
+    }
+
+    private PluginScanResult doLoad(
+            ClassLoader loader,
+            URL[] urls
+    ) throws ReflectiveOperationException {
+        ConfigurationBuilder builder = new ConfigurationBuilder();
+        builder.setClassLoaders(new ClassLoader[]{loader});
+        builder.addUrls(urls);
+        builder.setScanners(new SubTypesScanner());
+        builder.useParallelExecutor();
+        Reflections reflections = new InternalReflections(builder);
+
+        return new PluginScanResult(
+                getPluginWrapper(reflections, SourceConnector.class, loader),
+                getPluginWrapper(reflections, SinkConnector.class, loader),
+                getPluginWrapper(reflections, SourceTask.class, loader),
+                getPluginWrapper(reflections, SinkTask.class, loader),
+                getPluginWrapper(reflections, RecordConverter.class, loader),
+                getTransformationPluginWrapper(loader, reflections)
+        );
+    }
+
+    private Collection<PluginWrapper<Transform<?>>> getTransformationPluginWrapper(ClassLoader loader, Reflections reflections) throws ReflectiveOperationException {
+        return (Collection<PluginWrapper<Transform<?>>>) (Collection<?>) getPluginWrapper(reflections, Transform.class, loader);
+    }
+
+    private <T> Collection<PluginWrapper<T>> getPluginWrapper(
+            Reflections reflections,
+            Class<T> klass,
+            ClassLoader loader
+    ) throws InstantiationException, IllegalAccessException {
+        Set<Class<? extends T>> plugins;
+        try {
+            plugins = reflections.getSubTypesOf(klass);
+        } catch (ReflectionsException e) {
+            log.debug("Reflections scanner could not find any classes for URLs: " +
+                    reflections.getConfiguration().getUrls(), e);
+            return Collections.emptyList();
+        }
+
+        Collection<PluginWrapper<T>> result = new ArrayList<>();
+        for (Class<? extends T> plugin : plugins) {
+            if (PluginUtils.isConcrete(plugin)) {
+                result.add(new PluginWrapper<>(plugin, versionFor(plugin), loader));
+            } else {
+                log.debug("Skipping {} as it is not concrete implementation", plugin);
+            }
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> Collection<PluginWrapper<T>> getServiceLoaderPluginWrapper(Class<T> klass, ClassLoader loader) {
+        ClassLoader savedLoader = Plugin.compareAndSwapLoaders(loader);
+        Collection<PluginWrapper<T>> result = new ArrayList<>();
+        try {
+            ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
+            for (T pluginImpl : serviceLoader) {
+                result.add(new PluginWrapper<>((Class<? extends T>) pluginImpl.getClass(),
+                    versionFor(pluginImpl), loader));
+            }
+        } finally {
+            Plugin.compareAndSwapLoaders(savedLoader);
+        }
+        return result;
+    }
+
+    private static <T>  String versionFor(T pluginImpl) {
+//        return pluginImpl instanceof Versioned ? ((Versioned) pluginImpl).version() : UNDEFINED_VERSION;
+        return UNDEFINED_VERSION;
+    }
+
+    private static <T> String versionFor(Class<? extends T> pluginKlass) throws IllegalAccessException, InstantiationException {
+        // Temporary workaround until all the plugins are versioned.
+        return Connector.class.isAssignableFrom(pluginKlass) ? versionFor(pluginKlass.newInstance()) : UNDEFINED_VERSION;
+    }
+
+    @Override
+    protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+        String fullName = aliases.containsKey(name) ? aliases.get(name) : name;
+        PluginClassLoader pluginLoader = pluginClassLoader(fullName);
+        if (pluginLoader != null) {
+            log.trace("Retrieving loaded class '{}' from '{}'", fullName, pluginLoader);
+            return pluginLoader.loadClass(fullName, resolve);
+        }
+
+        return super.loadClass(fullName, resolve);
+    }
+
+    private void addAllAliases() {
+        addAliases(sourceConnectors);
+        addAliases(sinkConnectors);
+        addAliases(sourceTasks);
+        addAliases(sinkTasks);
+        addAliases(converters);
+        addAliases(transformations);
+    }
+
+    private <S> void addAliases(Collection<PluginWrapper<S>> plugins) {
+        for (PluginWrapper<S> plugin : plugins) {
+            if (PluginUtils.isAliasUnique(plugin, plugins)) {
+                String simple = PluginUtils.simpleName(plugin);
+                String pruned = PluginUtils.prunedName(plugin);
+                aliases.put(simple, plugin.className());
+                if (simple.equals(pruned)) {
+                    log.info("Added alias '{}' to plugin '{}'", simple, plugin.className());
+                } else {
+                    aliases.put(pruned, plugin.className());
+                    log.info(
+                            "Added aliases '{}' and '{}' to plugin '{}'",
+                            simple,
+                            pruned,
+                            plugin.className()
+                    );
+                }
+            }
+        }
+    }
+
+    private static class InternalReflections extends Reflections {
+
+        public InternalReflections(Configuration configuration) {
+            super(configuration);
+        }
+
+        // When Reflections is used for parallel scans, it has a bug where it propagates ReflectionsException
+        // as RuntimeException.  Override the scan behavior to emulate the singled-threaded logic.
+        @Override
+        protected void scan(URL url) {
+            try {
+                super.scan(url);
+            } catch (ReflectionsException e) {
+                Logger log = Reflections.log;
+                if (log != null && log.isWarnEnabled()) {
+                    log.warn("could not create Vfs.Dir from url. ignoring the exception and continuing", e);
+                }
+            }
+        }
+    }
+
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/Plugin.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/Plugin.java
index 66350da..7841eef 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/Plugin.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/Plugin.java
@@ -19,170 +19,65 @@
 import io.openmessaging.connector.api.component.Transform;
 import io.openmessaging.connector.api.component.connector.Connector;
 import io.openmessaging.connector.api.component.task.Task;
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.data.RecordConverter;
 import io.openmessaging.connector.api.errors.ConnectException;
-import org.reflections.Configuration;
-import org.reflections.Reflections;
-import org.reflections.ReflectionsException;
-import org.reflections.scanners.SubTypesScanner;
-import org.reflections.util.ConfigurationBuilder;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class Plugin extends URLClassLoader {
+public class Plugin {
+
+    public enum ClassLoaderUsage {
+        CURRENT_CLASSLOADER,
+        PLUGINS
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Plugin.class);
+    private final DelegatingClassLoader delegatingLoader;
 
-    private final List<String> pluginPaths;
-
-    private Map<String, PluginWrapper> classLoaderMap = new HashMap<>();
-
-    public Plugin(List<String> pluginPaths) {
-        this(pluginPaths, Plugin.class.getClassLoader());
+    public Plugin(List<String> pluginLocations) {
+        delegatingLoader = newDelegatingClassLoader(pluginLocations);
+        delegatingLoader.initLoaders();
     }
 
-    public Plugin(List<String> pluginPaths, ClassLoader parent) {
-        super(new URL[0], parent);
-        this.pluginPaths = pluginPaths;
+    public void initLoaders(){
+        delegatingLoader.initLoaders();
     }
 
-    public void initPlugin() {
-        for (String configPath : pluginPaths) {
-            loadPlugin(configPath);
-        }
-    }
-
-    private void loadPlugin(String path) {
-        Path pluginPath = Paths.get(path).toAbsolutePath();
-        path = pluginPath.toString();
-        try {
-            if (Files.isDirectory(pluginPath)) {
-                for (Path pluginLocation : PluginUtils.pluginLocations(pluginPath)) {
-                    registerPlugin(pluginLocation);
-                }
-            } else if (PluginUtils.isArchive(pluginPath)) {
-                registerPlugin(pluginPath);
-            }
-        } catch (IOException e) {
-            log.error("register plugin error, path: {}, e: {}", path, e);
-        }
-    }
-
-    private void doLoad(
-        ClassLoader loader,
-        URL[] urls
-    ) {
-        ConfigurationBuilder builder = new ConfigurationBuilder();
-        builder.setClassLoaders(new ClassLoader[] {loader});
-        builder.addUrls(urls);
-        builder.setScanners(new SubTypesScanner());
-        builder.useParallelExecutor();
-        Reflections reflections = new PluginReflections(builder);
-        getPlugin(reflections, Connector.class, loader);
-        getPlugin(reflections, Task.class, loader);
-        getPlugin(reflections, Transform.class, loader);
-    }
-
-    private <T> Collection<Class<? extends T>> getPlugin(
-        Reflections reflections,
-        Class<T> klass,
-        ClassLoader loader
-    ) {
-        Set<Class<? extends T>> plugins = reflections.getSubTypesOf(klass);
-        Collection<Class<? extends T>> result = new ArrayList<>();
-        for (Class<? extends T> plugin : plugins) {
-            classLoaderMap.put(plugin.getName(), new PluginWrapper(plugin, loader));
-            result.add(plugin);
-
-        }
-        return result;
-    }
-
-    private static class PluginReflections extends Reflections {
-
-        public PluginReflections(Configuration configuration) {
-            super(configuration);
-        }
-
-        @Override
-        protected void scan(URL url) {
-            try {
-                super.scan(url);
-            } catch (ReflectionsException e) {
-                Logger log = Reflections.log;
-                if (log != null && log.isWarnEnabled()) {
-                    log.warn("Scan url error. ignoring the exception and continuing", e);
-                }
-            }
-        }
-    }
-
-    private static PluginClassLoader newPluginClassLoader(
-        final URL pluginLocation,
-        final URL[] urls,
-        final ClassLoader parent
-    ) {
+    protected DelegatingClassLoader newDelegatingClassLoader(final List<String> paths) {
         return AccessController.doPrivileged(
-            (PrivilegedAction<PluginClassLoader>) () -> new PluginClassLoader(pluginLocation, urls, parent)
+                (PrivilegedAction<DelegatingClassLoader>) () -> new DelegatingClassLoader(paths)
         );
     }
 
-    private void registerPlugin(Path pluginLocation)
-        throws IOException {
-        log.info("Loading plugin from: {}", pluginLocation);
-        List<URL> pluginUrls = new ArrayList<>();
-        for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
-            pluginUrls.add(path.toUri().toURL());
-        }
-        URL[] urls = pluginUrls.toArray(new URL[0]);
-        if (log.isDebugEnabled()) {
-            log.debug("Loading plugin urls: {}", Arrays.toString(urls));
-        }
-        PluginClassLoader loader = newPluginClassLoader(
-            pluginLocation.toUri().toURL(),
-            urls,
-            this
-        );
-        doLoad(loader, urls);
+    public DelegatingClassLoader delegatingLoader() {
+        return delegatingLoader;
+    }
+    public Set<PluginWrapper<SinkConnector>> sinkConnectors() {
+        return delegatingLoader.sinkConnectors();
     }
 
-    public ClassLoader getPluginClassLoader(String pluginName) {
-        PluginWrapper pluginWrapper = classLoaderMap.get(pluginName);
-        if (null != pluginWrapper) {
-            return pluginWrapper.getClassLoader();
-        }
-        return null;
+    public Set<PluginWrapper<SourceConnector>> sourceConnectors() {
+        return delegatingLoader.sourceConnectors();
     }
 
-    public Task newTask(Class<? extends Task> taskClass) {
-        return newPlugin(taskClass);
+    public Set<PluginWrapper<RecordConverter>> converters() {
+        return delegatingLoader.converters();
     }
 
-    protected static <T> T newPlugin(Class<T> klass) {
-        // KAFKA-8340: The thread classloader is used during static initialization and must be
-        // set to the plugin's classloader during instantiation
-        ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
-        try {
-            return klass.getDeclaredConstructor().newInstance();
-        } catch (Throwable t) {
-            throw new ConnectException("Instantiation error", t);
-        } finally {
-            compareAndSwapLoaders(savedLoader);
-        }
+    public Set<PluginWrapper<Transform<?>>> transformations() {
+        return delegatingLoader.transformations();
     }
 
     public ClassLoader currentThreadLoader() {
@@ -191,10 +86,183 @@
 
     public static ClassLoader compareAndSwapLoaders(ClassLoader loader) {
         ClassLoader current = Thread.currentThread().getContextClassLoader();
-        if (null != current && !current.equals(loader)) {
+        if (!current.equals(loader)) {
             Thread.currentThread().setContextClassLoader(loader);
         }
         return current;
     }
 
+    protected static <U> Class<? extends U> pluginClass(
+            DelegatingClassLoader loader,
+            String classOrAlias,
+            Class<U> pluginClass
+    ) throws ClassNotFoundException {
+        Class<?> klass = loader.loadClass(classOrAlias, false);
+        if (pluginClass.isAssignableFrom(klass)) {
+            return (Class<? extends U>) klass;
+        }
+
+        throw new ClassNotFoundException(
+                "Requested class: "
+                        + classOrAlias
+                        + " does not extend " + pluginClass.getSimpleName()
+        );
+    }
+
+    public Connector newConnector(String connectorClassOrAlias) {
+        Class<? extends Connector> klass = connectorClass(connectorClassOrAlias);
+        return newPlugin(klass);
+    }
+
+    public Class<? extends Connector> connectorClass(String connectorClassOrAlias) {
+        Class<? extends Connector> klass;
+        try {
+            klass = pluginClass(delegatingLoader, connectorClassOrAlias, Connector.class);
+        } catch (ClassNotFoundException e) {
+            List<PluginWrapper<? extends Connector>> matches = new ArrayList<>();
+            Set<PluginWrapper<Connector>> connectors = delegatingLoader.connectors();
+            for (PluginWrapper<? extends Connector> plugin : connectors) {
+                Class<?> pluginClass = plugin.pluginClass();
+                String simpleName = pluginClass.getSimpleName();
+                if (simpleName.equals(connectorClassOrAlias)
+                        || simpleName.equals(connectorClassOrAlias + "Connector")) {
+                    matches.add(plugin);
+                }
+            }
+            if (matches.isEmpty()) {
+                throw new ConnectException(
+                        "Failed to find any class that implements Connector and which name matches "
+                                + connectorClassOrAlias
+                                + ", available connectors are: "
+                                + Utils.join(connectors, ", ")
+                );
+            }
+
+            // conflict connector
+            if (matches.size() > 1) {
+                throw new ConnectException(
+                        "More than one connector matches alias "
+                                + connectorClassOrAlias
+                                + ". Please use full package and class name instead. Classes found: "
+                                + Utils.join(connectors, ", ")
+                );
+            }
+
+            PluginWrapper<? extends Connector> entry = matches.get(0);
+            klass = entry.pluginClass();
+        }
+        return klass;
+    }
+
+    public Task newTask(Class<? extends Task> taskClass) {
+        return newPlugin(taskClass);
+    }
+
+    public RecordConverter newConverter(ConnectKeyValue config, String classPropertyName, ClassLoaderUsage classLoaderUsage) {
+        if (!config.containsKey(classPropertyName)) {
+            return null;
+        }
+        Class<? extends RecordConverter> klass = null;
+        switch (classLoaderUsage) {
+            case CURRENT_CLASSLOADER:
+                klass = pluginClassFromConfig(config, classPropertyName, RecordConverter.class, delegatingLoader.converters());
+                break;
+            case PLUGINS:
+                String converterClassOrAlias = Utils.getClass(config,classPropertyName).getName();
+                try {
+                    klass = pluginClass(delegatingLoader, converterClassOrAlias, RecordConverter.class);
+                } catch (ClassNotFoundException e) {
+                    throw new ConnectException(
+                            "Failed to find any class that implements Converter and which name matches "
+                                    + converterClassOrAlias + ", available converters are: "
+                                    + pluginNames(delegatingLoader.converters())
+                    );
+                }
+                break;
+        }
+        if (klass == null) {
+            throw new ConnectException("Unable to initialize the Converter specified in '" + classPropertyName + "'");
+        }
+
+        // Configure the Converter using only the old configuration mechanism ...
+        String configPrefix = classPropertyName + ".";
+        Map<String, String> converterConfig = config.originalsWithPrefix(configPrefix);
+        log.debug("Configuring the converter with configuration keys:{}{}", System.lineSeparator(), converterConfig.keySet());
+
+        RecordConverter plugin;
+        ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
+        try {
+            plugin = newPlugin(klass);
+            plugin.configure(converterConfig);
+        } finally {
+            compareAndSwapLoaders(savedLoader);
+        }
+        return plugin;
+    }
+
+
+    public <T> List<T> newPlugins(List<String> klassNames, ConnectKeyValue config, Class<T> pluginKlass) {
+        List<T> plugins = new ArrayList<>();
+        if (klassNames != null) {
+            for (String klassName : klassNames) {
+                plugins.add(newPlugin(klassName, config, pluginKlass));
+            }
+        }
+        return plugins;
+    }
+
+    protected static <T> T newPlugin(Class<T> klass) {
+        ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
+        try {
+            return Utils.newInstance(klass);
+        } catch (Throwable t) {
+            throw new ConnectException("Instantiation error", t);
+        } finally {
+            compareAndSwapLoaders(savedLoader);
+        }
+    }
+
+
+    public <T> T newPlugin(String klassName, ConnectKeyValue config, Class<T> pluginKlass) {
+        T plugin;
+        Class<? extends T> klass;
+        try {
+            klass = pluginClass(delegatingLoader, klassName, pluginKlass);
+        } catch (ClassNotFoundException e) {
+            String msg = String.format("Failed to find any class that implements %s and which "
+                    + "name matches %s", pluginKlass, klassName);
+            throw new ConnectException(msg);
+        }
+        ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
+        try {
+            plugin = newPlugin(klass);
+        } finally {
+            compareAndSwapLoaders(savedLoader);
+        }
+        return plugin;
+    }
+
+    private static <T> String pluginNames(Collection<PluginWrapper<T>> plugins) {
+        return Utils.join(plugins, ", ");
+    }
+
+
+    protected <U> Class<? extends U> pluginClassFromConfig(
+            ConnectKeyValue config,
+            String propertyName,
+            Class<U> pluginClass,
+            Collection<PluginWrapper<U>> plugins
+    ) {
+        Class<?> klass = Utils.getClass(config, propertyName);
+        if (pluginClass.isAssignableFrom(klass)) {
+            return (Class<? extends U>) klass;
+        }
+        throw new ConnectException(
+                "Failed to find any class that implements " + pluginClass.getSimpleName()
+                        + " for the config "
+                        + propertyName + ", available classes are: "
+                        + pluginNames(plugins)
+        );
+    }
+
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginClassLoader.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginClassLoader.java
index c5419b4..6924e7a 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginClassLoader.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginClassLoader.java
@@ -21,6 +21,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ *  A custom classloader dedicated to loading Connect plugin classes in classloading isolation.
+ */
 public class PluginClassLoader extends URLClassLoader {
     private static final Logger log = LoggerFactory.getLogger(PluginClassLoader.class);
     private final URL pluginLocation;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginScanResult.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginScanResult.java
new file mode 100644
index 0000000..ad7868d
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginScanResult.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.controller.isolation;
+
+
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.RecordConverter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * plugin scan result
+ */
+public class PluginScanResult {
+    private final Collection<PluginWrapper<SourceConnector>> sourceConnectors;
+    private final Collection<PluginWrapper<SinkConnector>> sinkConnectors;
+    private final Collection<PluginWrapper<SourceTask>> sourceTasks;
+    private final Collection<PluginWrapper<SinkTask>> sinkTasks;
+    private final Collection<PluginWrapper<RecordConverter>> converters;
+    private final Collection<PluginWrapper<Transform<?>>> transformations;
+
+    private final List<Collection> allPlugins;
+
+    public PluginScanResult(
+            Collection<PluginWrapper<SourceConnector>> sourceConnectors,
+            Collection<PluginWrapper<SinkConnector>> sinkConnectors,
+            Collection<PluginWrapper<SourceTask>> sourceTasks,
+            Collection<PluginWrapper<SinkTask>> sinkTasks,
+            Collection<PluginWrapper<RecordConverter>> converters,
+            Collection<PluginWrapper<Transform<?>>> transformations
+    ) {
+        this.sinkConnectors = sinkConnectors;
+        this.sourceConnectors = sourceConnectors;
+        this.sourceTasks = sourceTasks;
+        this.sinkTasks = sinkTasks;
+        this.converters = converters;
+        this.transformations = transformations;
+        this.allPlugins =
+            Arrays.asList(sinkConnectors, sourceConnectors, sourceTasks, sinkTasks, converters, transformations);
+    }
+
+    public Collection<PluginWrapper<SinkConnector>> sinkConnectors() {
+        return sinkConnectors;
+    }
+
+    public Collection<PluginWrapper<SourceConnector>> sourceConnectors() {
+        return sourceConnectors;
+    }
+
+    public Collection<PluginWrapper<SourceTask>> sourceTasks() {
+        return sourceTasks;
+    }
+
+    public Collection<PluginWrapper<SinkTask>> sinkTasks() {
+        return sinkTasks;
+    }
+
+
+    public Collection<PluginWrapper<RecordConverter>> converters() {
+        return converters;
+    }
+
+
+    public Collection<PluginWrapper<Transform<?>>> transformations() {
+        return transformations;
+    }
+
+    public boolean isEmpty() {
+        boolean isEmpty = true;
+        for (Collection plugins : allPlugins) {
+            isEmpty = isEmpty && plugins.isEmpty();
+        }
+        return isEmpty;
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginType.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginType.java
new file mode 100644
index 0000000..ede4607
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginType.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.controller.isolation;
+
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.component.connector.Connector;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.data.RecordConverter;
+
+import java.util.Locale;
+
+/**
+ * plugin type
+ */
+public enum PluginType {
+    SOURCE(SourceConnector.class),
+    SINK(SinkConnector.class),
+    CONNECTOR(Connector.class),
+    CONVERTER(RecordConverter.class),
+    TRANSFORMATION(Transform.class),
+    UNKNOWN(Object.class);
+
+    private Class<?> klass;
+
+    PluginType(Class<?> klass) {
+        this.klass = klass;
+    }
+
+    public static PluginType from(Class<?> klass) {
+        for (PluginType type : PluginType.values()) {
+            if (type.klass.isAssignableFrom(klass)) {
+                return type;
+            }
+        }
+        return UNKNOWN;
+    }
+
+    public String simpleName() {
+        return klass.getSimpleName();
+    }
+
+    @Override
+    public String toString() {
+        return super.toString().toLowerCase(Locale.ROOT);
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
index d46a7e1..4a804ca 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
@@ -17,11 +17,13 @@
 package org.apache.rocketmq.connect.runtime.controller.isolation;
 
 import java.io.IOException;
+import java.lang.reflect.Modifier;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -120,6 +122,16 @@
         + ")\\..*$"
         + "|io\\.openmessaging\\.KeyValue");
 
+
+    // If the base interface or class that will be used to identify Connect plugins resides within
+    // the same java package as the plugins that need to be loaded in isolation (and thus are
+    // added to the INCLUDE pattern), then this base interface or class needs to be excluded in the
+    // regular expression pattern
+    private static final Pattern INCLUDE = Pattern.compile("^(?:"
+            + "|org\\.apache\\.rocketmq\\.connect"
+            + ")\\..*$");
+
+
     private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new DirectoryStream
         .Filter<Path>() {
         @Override
@@ -152,6 +164,83 @@
         return locations;
     }
 
+    /**
+     * Verify the given class corresponds to a concrete class and not to an abstract class or
+     */
+    public static boolean isConcrete(Class<?> klass) {
+        int mod = klass.getModifiers();
+        return !Modifier.isAbstract(mod) && !Modifier.isInterface(mod);
+    }
+
+    /**
+     * Return whether the class with the given name should be loaded in isolation using a plugin
+     * classloader.
+     *
+     * @param name the fully qualified name of the class.
+     * @return true if this class should be loaded in isolation, false otherwise.
+     */
+    public static boolean shouldLoadInIsolation(String name) {
+        return !(BLACKLIST.matcher(name).matches() && !INCLUDE.matcher(name).matches());
+    }
+    /**
+     * Verify whether a given plugin's alias matches another alias in a collection of plugins.
+     *
+     * @param alias the plugin descriptor to test for alias matching.
+     * @param plugins the collection of plugins to test against.
+     * @param <U> the plugin type.
+     * @return false if a match was found in the collection, otherwise true.
+     */
+    public static <U> boolean isAliasUnique(
+            PluginWrapper<U> alias,
+            Collection<PluginWrapper<U>> plugins
+    ) {
+        boolean matched = false;
+        for (PluginWrapper<U> plugin : plugins) {
+            if (simpleName(alias).equals(simpleName(plugin))
+                    || prunedName(alias).equals(prunedName(plugin))) {
+                if (matched) {
+                    return false;
+                }
+                matched = true;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Return the simple class name of a plugin as {@code String}.
+     *
+     * @param plugin the plugin descriptor.
+     * @return the plugin's simple class name.
+     */
+    public static String simpleName(PluginWrapper<?> plugin) {
+        return plugin.pluginClass().getSimpleName();
+    }
+
+    /**
+     * Remove the plugin type name at the end of a plugin class name, if such suffix is present.
+     * This method is meant to be used to extract plugin aliases.
+     */
+    public static String prunedName(PluginWrapper<?> plugin) {
+        switch (plugin.type()) {
+            case SOURCE:
+            case SINK:
+            case CONNECTOR:
+                return prunePluginName(plugin, "Connector");
+            default:
+                return prunePluginName(plugin, plugin.type().simpleName());
+        }
+    }
+
+    private static String prunePluginName(PluginWrapper<?> plugin, String suffix) {
+        String simple = plugin.pluginClass().getSimpleName();
+        int pos = simple.lastIndexOf(suffix);
+        if (pos > 0) {
+            return simple.substring(0, pos);
+        }
+        return simple;
+    }
+
     public static List<Path> pluginUrls(Path topPath) throws IOException {
         boolean containsClassFiles = false;
         Set<Path> archives = new TreeSet<>();
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginWrapper.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginWrapper.java
index 070f6ce..0e713fc 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginWrapper.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginWrapper.java
@@ -16,19 +16,26 @@
  */
 package org.apache.rocketmq.connect.runtime.controller.isolation;
 
-import io.openmessaging.connector.api.component.task.sink.SinkConnector;
-import io.openmessaging.connector.api.component.task.source.SourceConnector;
-import java.util.Locale;
 
-public class PluginWrapper<T> {
+import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
+
+import java.util.Objects;
+
+/**
+ * @param <T>
+ */
+public class PluginWrapper<T> implements Comparable<PluginWrapper<T>>{
     private final Class<? extends T> klass;
     private final String name;
     private final PluginType type;
     private final String typeName;
     private final String location;
+    private final String version;
+    private final DefaultArtifactVersion encodedVersion;
+
     private final ClassLoader classLoader;
 
-    public PluginWrapper(Class<? extends T> klass, ClassLoader loader) {
+    public PluginWrapper(Class<? extends T> klass, String version, ClassLoader loader) {
         this.klass = klass;
         this.name = klass.getName();
         this.type = PluginType.from(klass);
@@ -37,39 +44,60 @@
         this.location = loader instanceof PluginClassLoader
             ? ((PluginClassLoader) loader).location()
             : "classpath";
+        this.version = version != null ? version : "null";
+        this.encodedVersion = new DefaultArtifactVersion(this.version);
     }
 
     public ClassLoader getClassLoader() {
         return this.classLoader;
     }
 
-    public enum PluginType {
-        SOURCE(SourceConnector.class),
-        SINK(SinkConnector.class),
-        UNKNOWN(Object.class);
+    public Class<? extends T> pluginClass() {
+        return klass;
+    }
 
-        private Class<?> klass;
+    public String className() {
+        return name;
+    }
 
-        PluginType(Class<?> klass) {
-            this.klass = klass;
+    public String version() {
+        return version;
+    }
+
+    public PluginType type() {
+        return type;
+    }
+
+    public String typeName() {
+        return typeName;
+    }
+
+    public String location() {
+        return location;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
         }
-
-        public static PluginType from(Class<?> klass) {
-            for (PluginType type : PluginType.values()) {
-                if (type.klass.isAssignableFrom(klass)) {
-                    return type;
-                }
-            }
-            return UNKNOWN;
+        if (!(o instanceof PluginWrapper)) {
+            return false;
         }
+        PluginWrapper<?> that = (PluginWrapper<?>) o;
+        return Objects.equals(klass, that.klass) &&
+                Objects.equals(version, that.version) &&
+                type == that.type;
+    }
 
-        public String simpleName() {
-            return klass.getSimpleName();
-        }
+    @Override
+    public int hashCode() {
+        return Objects.hash(klass, version, type);
+    }
 
-        @Override
-        public String toString() {
-            return super.toString().toLowerCase(Locale.ROOT);
-        }
+    @Override
+    public int compareTo(PluginWrapper other) {
+        int nameComp = name.compareTo(other.name);
+        return nameComp != 0 ? nameComp : encodedVersion.compareTo(other.encodedVersion);
     }
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContext.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContext.java
index 644f024..cc75862 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContext.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContext.java
@@ -71,8 +71,6 @@
     }
 
     /**
-     * Set the record consumed from Kafka in a sink connector.
-     *
      * @param consumedMessage the record
      */
     public void consumerRecord(MessageExt consumedMessage) {
@@ -80,9 +78,7 @@
         reset();
     }
 
-    /**
-     * @return the record consumed from Kafka. could be null
-     */
+
     public MessageExt consumerRecord() {
         return consumedMessage;
     }
@@ -96,7 +92,6 @@
 
     /**
      * Set the source record being processed in the connect pipeline.
-     *
      * @param record the source record
      */
     public void sourceRecord(ConnectRecord record) {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/ConnectorPluginsResource.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/ConnectorPluginsResource.java
new file mode 100644
index 0000000..fd2c13e
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/ConnectorPluginsResource.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.rest;
+
+import com.alibaba.fastjson.JSON;
+import io.javalin.Context;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginType;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginWrapper;
+import org.apache.rocketmq.connect.runtime.rest.entities.ConfigKeyInfo;
+import org.apache.rocketmq.connect.runtime.rest.entities.PluginInfo;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ConnectorPluginsResource {
+
+    private final AbstractConnectController connectController;
+    private final List<PluginInfo> connectorPlugins;
+
+    static final List<Class<? extends SinkConnector>> SINK_CONNECTOR_EXCLUDES = Arrays.asList();
+    static final List<Class<? extends SourceConnector>> SOURCE_CONNECTOR_EXCLUDES = Arrays.asList();
+
+
+    public ConnectorPluginsResource(AbstractConnectController connectController) {
+        this.connectController = connectController;
+        this.connectorPlugins = new ArrayList<>();
+
+        // TODO: improve once plugins are allowed to be added/removed during runtime.
+        addConnectorPlugins(connectController.plugin().sinkConnectors(), SINK_CONNECTOR_EXCLUDES);
+        addConnectorPlugins(connectController.plugin().sourceConnectors(), SOURCE_CONNECTOR_EXCLUDES);
+        addConnectorPlugins(connectController.plugin().transformations(), new ArrayList<>());
+        addConnectorPlugins(connectController.plugin().converters(), Collections.emptySet());
+    }
+
+    private <T> void addConnectorPlugins(Collection<PluginWrapper<T>> plugins, Collection<Class<? extends T>> excludes) {
+        plugins.stream()
+                .filter(p -> !excludes.contains(p.pluginClass()))
+                .map(PluginInfo::new)
+                .forEach(connectorPlugins::add);
+    }
+
+
+    /**
+     * validate plugin configs
+     * @param context
+     * @throws Throwable
+     */
+    public void validateConfigs(Context context) {
+        // No-op
+    }
+
+    /**
+     * list connector plugins
+     * @param context
+     * @return
+     */
+    public void listConnectorPlugins(Context context) {
+       boolean connectorsOnly = context.anyFormParamNull("connectorsOnly")
+               ? false : Boolean.parseBoolean(context.pathParam("connectorsOnly")) ;
+        synchronized (this) {
+            if (connectorsOnly) {
+                List<PluginInfo> pluginInfos = Collections.unmodifiableList(connectorPlugins.stream()
+                        .filter(p -> PluginType.SINK.toString().equals(p.getType()) || PluginType.SOURCE.toString().equals(p.getType()))
+                        .collect(Collectors.toList()));
+                context.result(JSON.toJSONString(pluginInfos));
+            } else {
+                context.result(JSON.toJSONString(Collections.unmodifiableList(connectorPlugins)));
+            }
+        }
+    }
+
+    /**
+     * Get connector config def
+     * @param context
+     * @return
+     */
+    public List<ConfigKeyInfo> getConnectorConfigDef(Context context) {
+        // No-op
+        return Collections.emptyList();
+    }
+
+    /**
+     * reload plugins
+     * @param context
+     */
+    public void reloadPlugins(Context context) {
+        connectController.getConfigManagementService().getPlugin().initLoaders();
+        context.result("success");
+    }
+}
+
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
index 1eda51f..c49d5a7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
@@ -47,8 +47,12 @@
 
     private static final String TASK_CONFIGS = "taskConfigs";
 
+    /** connector plugin resource */
+    private ConnectorPluginsResource pluginsResource;
     public RestHandler(AbstractConnectController connectController) {
         this.connectController = connectController;
+        pluginsResource = new ConnectorPluginsResource(connectController);
+
         Javalin app = Javalin.create();
         app.enableCaseSensitiveUrls();
         app = app.start(connectController.getConnectConfig().getHttpPort());
@@ -70,7 +74,13 @@
         app.get("/getConfigInfo", this::getConfigInfo);
         app.get("/getAllocatedConnectors", this::getAllocatedConnectors);
         app.get("/getAllocatedTasks", this::getAllocatedTasks);
-        app.get("/plugin/reload", this::reloadPlugins);
+
+        /**plugin resource handler*/
+        app.get("/plugin/reload", context -> pluginsResource.reloadPlugins(context));
+        app.get("/plugin/list", context -> pluginsResource.listConnectorPlugins(context));
+        app.get("/plugin/config", context -> pluginsResource.getConnectorConfigDef(context));
+        app.get("/plugin/config/validate", context -> pluginsResource.validateConfigs(context));
+
     }
 
 
@@ -244,8 +254,4 @@
         return result;
     }
 
-    private void reloadPlugins(Context context) {
-        connectController.getConfigManagementService().getPlugin().initPlugin();
-        context.result("success");
-    }
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/ConfigKeyInfo.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/ConfigKeyInfo.java
new file mode 100644
index 0000000..851ec47
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/ConfigKeyInfo.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.rest.entities;
+
+
+import java.util.List;
+import java.util.Objects;
+
+public class ConfigKeyInfo {
+
+    private String name;
+    private String type;
+    private boolean required;
+    private String defaultValue;
+    private String importance;
+    private String documentation;
+    private String group;
+    private int orderInGroup;
+    private String width;
+    private String displayName;
+    private List<String> dependents;
+
+    public ConfigKeyInfo(String name,
+                         String type,
+                         boolean required,
+                         String defaultValue,
+                         String importance,
+                         String documentation,
+                         String group,
+                         int orderInGroup,
+                         String width,
+                         String displayName,
+                         List<String> dependents) {
+        this.name = name;
+        this.type = type;
+        this.required = required;
+        this.defaultValue = defaultValue;
+        this.importance = importance;
+        this.documentation = documentation;
+        this.group = group;
+        this.orderInGroup = orderInGroup;
+        this.width = width;
+        this.displayName = displayName;
+        this.dependents = dependents;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public boolean isRequired() {
+        return required;
+    }
+
+    public String getDefaultValue() {
+        return defaultValue;
+    }
+
+    public String getImportance() {
+        return importance;
+    }
+
+    public String getDocumentation() {
+        return documentation;
+    }
+
+    public String getGroup() {
+        return group;
+    }
+
+    public int getOrderInGroup() {
+        return orderInGroup;
+    }
+
+    public String getWidth() {
+        return width;
+    }
+
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    public List<String> getDependents() {
+        return dependents;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ConfigKeyInfo that = (ConfigKeyInfo) o;
+        return Objects.equals(name, that.name) &&
+               Objects.equals(type, that.type) &&
+               Objects.equals(required, that.required) &&
+               Objects.equals(defaultValue, that.defaultValue) &&
+               Objects.equals(importance, that.importance) &&
+               Objects.equals(documentation, that.documentation) &&
+               Objects.equals(group, that.group) &&
+               Objects.equals(orderInGroup, that.orderInGroup) &&
+               Objects.equals(width, that.width) &&
+               Objects.equals(displayName, that.displayName) &&
+               Objects.equals(dependents, that.dependents);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, type, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("[")
+            .append(name)
+            .append(",")
+            .append(type)
+            .append(",")
+            .append(required)
+            .append(",")
+            .append(defaultValue)
+            .append(",")
+            .append(importance)
+            .append(",")
+            .append(documentation)
+            .append(",")
+            .append(group)
+            .append(",")
+            .append(orderInGroup)
+            .append(",")
+            .append(width)
+            .append(",")
+            .append(displayName)
+            .append(",")
+            .append(dependents)
+            .append("]");
+        return sb.toString();
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/PluginInfo.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/PluginInfo.java
new file mode 100644
index 0000000..007b97c
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/PluginInfo.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.rest.entities;
+
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginType;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginWrapper;
+
+import java.util.Objects;
+
+/**
+ * plugin info
+ */
+public class PluginInfo {
+    private String className;
+    private PluginType type;
+    private String version;
+
+    public PluginInfo(String className, PluginType type, String version) {
+        this.className = className;
+        this.type = type;
+        this.version = version;
+    }
+
+    public PluginInfo(PluginWrapper<?> plugin) {
+        this(plugin.className(), plugin.type(), plugin.version());
+    }
+
+    public String getClassName() {
+        return className;
+    }
+
+    public void setClassName(String className) {
+        this.className = className;
+    }
+
+    public PluginType getType() {
+        return type;
+    }
+
+    public void setType(PluginType type) {
+        this.type = type;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof PluginInfo)) return false;
+        PluginInfo that = (PluginInfo) o;
+        return Objects.equals(className, that.className) && type == that.type && Objects.equals(version, that.version);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(className, type, version);
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
index 8792593..346d5d0 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
@@ -25,8 +25,6 @@
  */
 public interface ClusterManagementService {
 
-    Long WORKER_TIME_OUT = 30 * 1000L;
-
     void initialize(ConnectConfig connectConfig);
 
     /**
@@ -69,8 +67,16 @@
      */
     void registerListener(WorkerStatusListener listener);
 
+    /**
+     * get current run worker
+     * @return
+     */
     String getCurrentWorker();
 
+    /**
+     * staging mode
+     * @return
+     */
     StagingMode getStagingMode();
 
     interface WorkerStatusListener {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index a37fea0..6496c97 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -177,7 +177,7 @@
         }
 
         String connectorClass = configs.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
-        ClassLoader classLoader = plugin.getPluginClassLoader(connectorClass);
+        ClassLoader classLoader = plugin.delegatingLoader().pluginClassLoader(connectorClass);
         Class clazz;
         if (null != classLoader) {
             clazz = Class.forName(connectorClass, true, classLoader);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
index 06256ad..f5a8c0f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
@@ -18,6 +18,7 @@
 
 
 import io.openmessaging.connector.api.component.connector.Connector;
+import io.openmessaging.connector.api.errors.ConnectException;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
@@ -27,6 +28,7 @@
 import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
 import org.apache.rocketmq.connect.runtime.store.MemoryBasedKeyValueStore;
 import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,19 +131,22 @@
             }
         }
 
+        ClassLoader savedLoader = plugin.currentThreadLoader();
         String connectorClass = configs.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
-        ClassLoader classLoader = plugin.getPluginClassLoader(connectorClass);
-        Class clazz;
-        if (null != classLoader) {
-            clazz = Class.forName(connectorClass, true, classLoader);
-        } else {
-            clazz = Class.forName(connectorClass);
+        ClassLoader connectLoader = plugin.delegatingLoader().pluginClassLoader(connectorClass);
+        savedLoader = Plugin.compareAndSwapLoaders(connectLoader);
+        try {
+            Class clazz = Utils.getContextCurrentClassLoader().loadClass(connectorClass);
+            final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
+            connector.validate(configs);
+            connector.start(configs);
+            connectorKeyValueStore.put(connectorName, configs);
+            recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
+        } catch (Exception ex) {
+            throw new ConnectException(ex);
+        } finally {
+            Plugin.compareAndSwapLoaders(savedLoader);
         }
-        final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
-        connector.validate(configs);
-        connector.start(configs);
-        connectorKeyValueStore.put(connectorName, configs);
-        recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
         return "";
     }
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
index 79f52c5..5aea5f1 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
@@ -17,9 +17,17 @@
 
 package org.apache.rocketmq.connect.runtime.utils;
 
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+
 /**
  *  common utils
  */
@@ -52,4 +60,83 @@
         }
     }
 
+    /**
+     * Instantiate the class
+     */
+    public static <T> T newInstance(Class<T> c) {
+        if (c == null) {
+            throw new ConnectException("class cannot be null");
+        }
+        try {
+            return c.getDeclaredConstructor().newInstance();
+        } catch (NoSuchMethodException e) {
+            throw new ConnectException("Could not find a public no-argument constructor for " + c.getName(), e);
+        } catch (ReflectiveOperationException | RuntimeException e) {
+            throw new ConnectException("Could not instantiate class " + c.getName(), e);
+        }
+    }
+
+    public static <T> String join(T[] strs, String separator) {
+        return join(Arrays.asList(strs), separator);
+    }
+
+    public static <T> String join(Collection<T> collection, String separator) {
+        Objects.requireNonNull(collection);
+        StringBuilder sb = new StringBuilder();
+        Iterator<T> iter = collection.iterator();
+        while (iter.hasNext()) {
+            sb.append(iter.next());
+            if (iter.hasNext()) {
+                sb.append(separator);
+            }
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Look up a class by name.
+     */
+    public static <T> Class<? extends T> loadClass(String klass, Class<T> base) throws ClassNotFoundException {
+        return Class.forName(klass, true, Utils.getContextCurrentClassLoader()).asSubclass(base);
+    }
+
+    /**
+     * Get current classLoader
+     */
+    public static ClassLoader getCurrentClassLoader() {
+        return Utils.class.getClassLoader();
+    }
+
+
+    /**
+     * get context current class loader
+     * @return
+     */
+    public static ClassLoader getContextCurrentClassLoader() {
+        // use thread classloader first
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        if (cl == null) {
+            return getCurrentClassLoader();
+        }
+        return cl;
+
+    }
+
+    /**
+     * get class
+     * @param key
+     * @return
+     */
+    public static Class<?> getClass(ConnectKeyValue config,final String key) {
+        if (!config.containsKey(key) || StringUtils.isEmpty(config.getString(key))) {
+            throw new ConnectException("");
+        }
+        ClassLoader contextCurrentClassLoader = Utils.getContextCurrentClassLoader();
+        try {
+            Class<?> klass = contextCurrentClassLoader.loadClass(config.getString(key).trim());
+            return Class.forName(klass.getName(), true, contextCurrentClassLoader);
+        } catch (ClassNotFoundException e) {
+            throw new ConnectException("Expected a Class instance or class name. key ["+ key+"], value ["+config.getString(key)+"]");
+        }
+    }
 }