[ISSUE #230]Optimize plugin loading cloass loader, and support rest API to get connector plugin list (#232)
* Fix debezium demecial type conversion problem #190
* Upgrade rocketmq-replicator API to v0.1.3 #189
* Encountered change event for table databasename.tablename whose schema isn`t known to this connector #191
* Debezium mysql source connector delete event causes null pointer #196
* remove local config
* Debezium mysql source connector delete event causes null pointer #196
* Rocketmq replicator running null pointer #205
* Optimize plugin loading cloass loader, and support rest API to get connector plugin list #230
* remove local config
* fixed
* remove invalid references
* fixed
* move getClass method to Utils
diff --git a/rocketmq-connect-runtime/pom.xml b/rocketmq-connect-runtime/pom.xml
index 6571673..bd31d7c 100644
--- a/rocketmq-connect-runtime/pom.xml
+++ b/rocketmq-connect-runtime/pom.xml
@@ -248,5 +248,11 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-artifact</artifactId>
+ <version>3.8.1</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
index 3e958d5..6fdb26f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
@@ -129,7 +129,6 @@
}
}
Plugin plugin = new Plugin(pluginPaths);
- plugin.initPlugin();
// Create controller and initialize.
ClusterManagementService clusterManagementService = ServiceProviderUtil.getClusterManagementServices(StagingMode.DISTRIBUTED);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
index c7adad2..a214ff7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
@@ -128,7 +128,6 @@
}
}
Plugin plugin = new Plugin(pluginPaths);
- plugin.initPlugin();
ClusterManagementService clusterManagementService = ServiceProviderUtil.getClusterManagementServices(StagingMode.STANDALONE);
clusterManagementService.initialize(connectConfig);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
index f64b9e9..3e5960e 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
@@ -18,6 +18,10 @@
package org.apache.rocketmq.connect.runtime.common;
import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
+
import java.io.Serializable;
import java.util.Map;
import java.util.Set;
@@ -125,6 +129,14 @@
this.properties = properties;
}
+
+ /**
+ * Gets all original settings with the given prefix.
+ */
+ public Map<String, String> originalsWithPrefix(String prefix) {
+ return originalsWithPrefix(prefix, true);
+ }
+
/**
* Gets all original settings with the given prefix.
* @param prefix the prefix to use as a filter
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
index ef3d264..f1e8d7a 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
@@ -22,6 +22,7 @@
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.Transform;
import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
import io.openmessaging.internal.DefaultKeyValue;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -31,6 +32,7 @@
import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +40,10 @@
import java.util.List;
import java.util.Set;
+/**
+ * Transform serial actuator, including the initialization of transform
+ * @param <R>
+ */
public class TransformChain<R extends ConnectRecord> implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
@@ -83,7 +89,7 @@
String transformClassKey = PREFIX + transformStr + "-class";
String transformClass = config.getString(transformClassKey);
try {
- Transform transform = getTransform(transformClass);
+ Transform transform = newTransform(transformClass);
KeyValue transformConfig = new DefaultKeyValue();
Set<String> configKeys = config.keySet();
for (String key : configKeys) {
@@ -121,24 +127,21 @@
return connectRecord;
}
- private Transform getTransform(String transformClass) throws Exception {
- ClassLoader loader = plugin.getPluginClassLoader(transformClass);
- final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
- Class transformClazz;
- boolean isolationFlag = false;
- if (loader instanceof PluginClassLoader) {
- transformClazz = ((PluginClassLoader) loader).loadClass(transformClass, false);
- isolationFlag = true;
- } else {
- transformClazz = Class.forName(transformClass);
- }
- final Transform transform = (Transform) transformClazz.getDeclaredConstructor().newInstance();
- if (isolationFlag) {
- Plugin.compareAndSwapLoaders(loader);
- }
- Plugin.compareAndSwapLoaders(currentThreadLoader);
- return transform;
+
+ private Transform newTransform(String transformClass) throws Exception {
+ ClassLoader savedLoader = plugin.currentThreadLoader();
+ try {
+ ClassLoader loader = plugin.delegatingLoader().pluginClassLoader(transformClass);
+ savedLoader = Plugin.compareAndSwapLoaders(loader);
+ Class transformClazz = Utils.getContextCurrentClassLoader().loadClass(transformClass);
+ final Transform transform = (Transform) transformClazz.getDeclaredConstructor().newInstance();
+ return transform;
+ } catch (Exception ex) {
+ throw new ConnectException("Load transform failed !!", ex);
+ }finally {
+ Plugin.compareAndSwapLoaders(savedLoader);
+ }
}
/**
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index ca34ca7..369ec9d 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -26,6 +26,7 @@
import io.openmessaging.connector.api.component.task.source.SourceTask;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordConverter;
+import io.openmessaging.connector.api.errors.ConnectException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
@@ -48,6 +49,7 @@
import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
import org.apache.rocketmq.connect.runtime.utils.ServiceThread;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -183,31 +185,27 @@
}
for (String connectorName : newConnectors.keySet()) {
+ ClassLoader savedLoader = plugin.currentThreadLoader();
try {
ConnectKeyValue keyValue = newConnectors.get(connectorName);
String connectorClass = keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
- ClassLoader loader = plugin.getPluginClassLoader(connectorClass);
- final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
- Class clazz;
- boolean isolationFlag = false;
- if (loader instanceof PluginClassLoader) {
- clazz = ((PluginClassLoader) loader).loadClass(connectorClass, false);
- isolationFlag = true;
- } else {
- clazz = Class.forName(connectorClass);
- }
- final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
+ ClassLoader connectorLoader = plugin.delegatingLoader().pluginClassLoader(connectorClass);
+ savedLoader = Plugin.compareAndSwapLoaders(connectorLoader);
+
+ // instance connector
+ final Connector connector = plugin.newConnector(connectorClass);
WorkerConnector workerConnector = new WorkerConnector(connectorName, connector, connectorConfigs.get(connectorName), new DefaultConnectorContext(connectorName, connectController));
- if (isolationFlag) {
- Plugin.compareAndSwapLoaders(loader);
- }
+
+ // start connector
workerConnector.initialize();
workerConnector.start();
log.info("Connector {} start", workerConnector.getConnectorName());
- Plugin.compareAndSwapLoaders(currentThreadLoader);
this.workingConnectors.add(workerConnector);
} catch (Exception e) {
log.error("worker connector start exception. workerName: " + connectorName, e);
+ } finally {
+ // compare and swap
+ Plugin.compareAndSwapLoaders(savedLoader);
}
}
}
@@ -555,34 +553,32 @@
ClassLoader savedLoader = plugin.currentThreadLoader();
try {
-
String connType = keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
- ClassLoader connectorLoader = plugin.getPluginClassLoader(connType);
+ ClassLoader connectorLoader = plugin.delegatingLoader().connectorLoader(connType);
savedLoader = Plugin.compareAndSwapLoaders(connectorLoader);
-
+ // new task
final Class<? extends Task> taskClass = plugin.currentThreadLoader().loadClass(keyValue.getString(RuntimeConfigDefine.TASK_CLASS)).asSubclass(Task.class);
-
final Task task = plugin.newTask(taskClass);
- final String valueConverterClazzName = keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, RuntimeConfigDefine.SOURCE_RECORD_CONVERTER_DEFAULT);
- final String keyConverterClazzName = keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER_DEFAULT);
- // new stance
- RecordConverter valueConverter = Class.forName(valueConverterClazzName).asSubclass(RecordConverter.class).getDeclaredConstructor().newInstance();
- RecordConverter keyConverter = Class.forName(keyConverterClazzName).asSubclass(RecordConverter.class).getDeclaredConstructor().newInstance();
+ /**
+ * create key/value converter
+ */
+ RecordConverter valueConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, Plugin.ClassLoaderUsage.CURRENT_CLASSLOADER);
+ RecordConverter keyConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, Plugin.ClassLoaderUsage.CURRENT_CLASSLOADER);
- //value config
- Map<String, String> valueConverterConfig = keyValue.originalsWithPrefix(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, true);
- valueConverterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
- valueConverter.configure(valueConverterConfig);
+ if (keyConverter == null) {
+ keyConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, Plugin.ClassLoaderUsage.PLUGINS);
+ log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), id);
+ } else {
+ log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), id);
+ }
+ if (valueConverter == null) {
+ valueConverter = plugin.newConverter(keyValue, RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, Plugin.ClassLoaderUsage.PLUGINS);
+ log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), id);
+ } else {
+ log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), id);
+ }
- //key config
- Map<String, String> keyConverterConfig = keyValue.originalsWithPrefix(RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, true);
- keyConverterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.KEY.getName());
- keyConverter.configure(keyConverterConfig);
-
-// if (isolationFlag) {
-// Plugin.compareAndSwapLoaders(loader);
-// }
if (task instanceof SourceTask) {
DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(workerConfig);
TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
@@ -622,6 +618,7 @@
Plugin.compareAndSwapLoaders(savedLoader);
} catch (Exception e) {
log.error("start worker task exception. config {}" + JSON.toJSONString(keyValue), e);
+ Plugin.compareAndSwapLoaders(savedLoader);
}
}
}
@@ -682,23 +679,23 @@
this.pendingTasks.put(workerDirectTask, System.currentTimeMillis());
}
- private Task getTask(String taskClass) throws Exception {
- ClassLoader loader = plugin.getPluginClassLoader(taskClass);
- final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
- Class taskClazz;
- boolean isolationFlag = false;
- if (loader instanceof PluginClassLoader) {
- taskClazz = ((PluginClassLoader) loader).loadClass(taskClass, false);
- isolationFlag = true;
- } else {
- taskClazz = Class.forName(taskClass);
+ private Task getTask(String taskClass){
+ ClassLoader savedLoader = plugin.currentThreadLoader();
+ Task task = null;
+ try {
+ // Get plugin loader
+ ClassLoader taskLoader = plugin.delegatingLoader().pluginClassLoader(taskClass);
+ // Compare and set current loader
+ savedLoader = Plugin.compareAndSwapLoaders(taskLoader);
+ // load class
+ Class taskClazz = Utils.getContextCurrentClassLoader().loadClass(taskClass).asSubclass(Task.class);
+ // new task
+ task = plugin.newTask(taskClazz);
+ } catch (Exception ex ){
+ throw new ConnectException("Create direct task failure", ex);
+ } finally {
+ Plugin.compareAndSwapLoaders(savedLoader);
}
- final Task task = (Task) taskClazz.getDeclaredConstructor().newInstance();
- if (isolationFlag) {
- Plugin.compareAndSwapLoaders(loader);
- }
-
- Plugin.compareAndSwapLoaders(currentThreadLoader);
return task;
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
index 7ca5a43..3bb6818 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
@@ -161,4 +161,7 @@
return connectStatsService;
}
+ public Plugin plugin() {
+ return this.plugin;
+ }
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/DelegatingClassLoader.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/DelegatingClassLoader.java
new file mode 100644
index 0000000..6153222
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/DelegatingClassLoader.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.controller.isolation;
+
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.component.connector.Connector;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.RecordConverter;
+import org.reflections.Configuration;
+import org.reflections.Reflections;
+import org.reflections.ReflectionsException;
+import org.reflections.scanners.SubTypesScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.InvalidPathException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.sql.Driver;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/**
+ * Delegating class Loader
+ */
+public class DelegatingClassLoader extends URLClassLoader {
+ private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
+ private static final String CLASSPATH_NAME = "classpath";
+ private static final String UNDEFINED_VERSION = "undefined";
+
+ private final Map<String, SortedMap<PluginWrapper<?>, ClassLoader>> pluginLoaders;
+ private final Map<String, String> aliases;
+ private final SortedSet<PluginWrapper<SinkConnector>> sinkConnectors;
+ private final SortedSet<PluginWrapper<SourceConnector>> sourceConnectors;
+ private final Collection<PluginWrapper<SourceTask>> sourceTasks;
+ private final Collection<PluginWrapper<SinkTask>> sinkTasks;
+ private final SortedSet<PluginWrapper<RecordConverter>> converters;
+ private final SortedSet<PluginWrapper<Transform<?>>> transformations;
+ private final List<String> pluginPaths;
+
+
+ public DelegatingClassLoader(List<String> pluginPaths, ClassLoader parent) {
+ super(new URL[0], parent);
+ this.pluginPaths = pluginPaths;
+ this.pluginLoaders = new HashMap<>();
+ this.aliases = new HashMap<>();
+ this.sinkConnectors = new TreeSet<>();
+ this.sourceConnectors = new TreeSet<>();
+ this.sourceTasks = new TreeSet<>();
+ this.sinkTasks = new TreeSet<>();
+ this.converters = new TreeSet<>();
+ this.transformations = new TreeSet<>();
+ }
+
+ public DelegatingClassLoader(List<String> pluginPaths) {
+ this(pluginPaths, DelegatingClassLoader.class.getClassLoader());
+ }
+
+ public Set<PluginWrapper<Connector>> connectors() {
+ Set<PluginWrapper<Connector>> connectors = new TreeSet<>((Set) sinkConnectors);
+ connectors.addAll((Set) sourceConnectors);
+ return connectors;
+ }
+
+ public Set<PluginWrapper<SinkConnector>> sinkConnectors() {
+ return sinkConnectors;
+ }
+
+ public Set<PluginWrapper<SourceConnector>> sourceConnectors() {
+ return sourceConnectors;
+ }
+
+ public Set<PluginWrapper<RecordConverter>> converters() {
+ return converters;
+ }
+
+ public Set<PluginWrapper<Transform<?>>> transformations() {
+ return transformations;
+ }
+
+ /**
+ * Retrieve the PluginClassLoader associated with a plugin class
+ * @param name
+ * @return
+ */
+ public PluginClassLoader pluginClassLoader(String name) {
+// if (!PluginUtils.shouldLoadInIsolation(name)) {
+// return null;
+// }
+ SortedMap<PluginWrapper<?>, ClassLoader> inner = pluginLoaders.get(name);
+ if (inner == null) {
+ return null;
+ }
+ ClassLoader pluginLoader = inner.get(inner.lastKey());
+ return pluginLoader instanceof PluginClassLoader
+ ? (PluginClassLoader) pluginLoader
+ : null;
+ }
+
+ public ClassLoader connectorLoader(Connector connector) {
+ return connectorLoader(connector.getClass().getName());
+ }
+
+ public ClassLoader connectorLoader(String connectorClassOrAlias) {
+ String fullName = aliases.containsKey(connectorClassOrAlias)
+ ? aliases.get(connectorClassOrAlias)
+ : connectorClassOrAlias;
+ ClassLoader classLoader = pluginClassLoader(fullName);
+ if (classLoader == null) {
+ classLoader = this;
+ }
+ log.debug(
+ "Getting plugin class loader: '{}' for connector: {}",
+ classLoader,
+ connectorClassOrAlias
+ );
+ return classLoader;
+ }
+
+ private static PluginClassLoader newPluginClassLoader(
+ final URL pluginLocation,
+ final URL[] urls,
+ final ClassLoader parent
+ ) {
+ return AccessController.doPrivileged(
+ (PrivilegedAction<PluginClassLoader>) () -> new PluginClassLoader(pluginLocation, urls, parent)
+ );
+ }
+
+ private <T> void addPlugins(Collection<PluginWrapper<T>> plugins, ClassLoader loader) {
+ for (PluginWrapper<T> plugin : plugins) {
+ String pluginClassName = plugin.className();
+ SortedMap<PluginWrapper<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
+ if (inner == null) {
+ inner = new TreeMap<>();
+ pluginLoaders.put(pluginClassName, inner);
+ log.info("Added plugin '{}'", pluginClassName);
+ }
+ inner.put(plugin, loader);
+ }
+ }
+
+ public void initLoaders() {
+ long begin_time = System.currentTimeMillis();
+ for (String configPath : pluginPaths) {
+ initPluginLoader(configPath);
+ }
+ initPluginLoader(CLASSPATH_NAME);
+ addAllAliases();
+ log.info("Init all plugins cost time = {} ms", System.currentTimeMillis()-begin_time);
+ }
+
+ private void initPluginLoader(String path) {
+ try {
+ if (CLASSPATH_NAME.equals(path)) {
+ scanUrlsAndAddPlugins(
+ getParent(),
+ ClasspathHelper.forJavaClassPath().toArray(new URL[0])
+ );
+ } else {
+ Path pluginPath = Paths.get(path).toAbsolutePath();
+ path = pluginPath.toString();
+ if (Files.isDirectory(pluginPath)) {
+ for (Path pluginLocation : PluginUtils.pluginLocations(pluginPath)) {
+ registerPlugin(pluginLocation);
+ }
+ } else if (PluginUtils.isArchive(pluginPath)) {
+ registerPlugin(pluginPath);
+ }
+ }
+ } catch (InvalidPathException | MalformedURLException e) {
+ log.error("Invalid path in plugin path: {}. Ignoring.", path, e);
+ } catch (IOException e) {
+ log.error("Could not get listing for plugin path: {}. Ignoring.", path, e);
+ } catch (ReflectiveOperationException e) {
+ log.error("Could not instantiate plugins in: {}. Ignoring.", path, e);
+ }
+ }
+
+ private void registerPlugin(Path pluginLocation)
+ throws ReflectiveOperationException, IOException {
+ log.info("Loading plugin from: {}", pluginLocation);
+ List<URL> pluginUrls = new ArrayList<>();
+ for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
+ pluginUrls.add(path.toUri().toURL());
+ }
+ URL[] urls = pluginUrls.toArray(new URL[0]);
+ if (log.isDebugEnabled()) {
+ log.debug("Loading plugin urls: {}", Arrays.toString(urls));
+ }
+ PluginClassLoader loader = newPluginClassLoader(
+ pluginLocation.toUri().toURL(),
+ urls,
+ this
+ );
+ scanUrlsAndAddPlugins(loader, urls);
+ }
+
+ private void scanUrlsAndAddPlugins(
+ ClassLoader loader,
+ URL[] urls
+ ) throws ReflectiveOperationException {
+ long begin_time = System.currentTimeMillis();
+ PluginScanResult plugins = doLoad(loader, urls);
+ log.info("Registered loader: {}, cost time = {} ms", loader, System.currentTimeMillis()-begin_time);
+ if (!plugins.isEmpty()) {
+ addPlugins(plugins.sinkConnectors(), loader);
+ sinkConnectors.addAll(plugins.sinkConnectors());
+ addPlugins(plugins.sourceConnectors(), loader);
+ sourceConnectors.addAll(plugins.sourceConnectors());
+ addPlugins(plugins.sourceTasks(), loader);
+ sourceTasks.addAll(plugins.sourceTasks());
+ addPlugins(plugins.sinkTasks(), loader);
+ sinkTasks.addAll(plugins.sinkTasks());
+ addPlugins(plugins.converters(), loader);
+ converters.addAll(plugins.converters());
+ addPlugins(plugins.transformations(), loader);
+ transformations.addAll(plugins.transformations());
+ }
+ loadJdbcDrivers(loader);
+ }
+
+ private void loadJdbcDrivers(final ClassLoader loader) {
+ AccessController.doPrivileged(
+ new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
+ Driver.class,
+ loader
+ );
+ Iterator<Driver> driversIterator = loadedDrivers.iterator();
+ try {
+ while (driversIterator.hasNext()) {
+ Driver driver = driversIterator.next();
+ log.debug(
+ "Registered java.sql.Driver: {} to java.sql.DriverManager",
+ driver
+ );
+ }
+ } catch (Throwable t) {
+ log.debug(
+ "Ignoring java.sql.Driver classes listed in resources but not"
+ + " present in class loader's classpath: ",
+ t
+ );
+ }
+ return null;
+ }
+ }
+ );
+ }
+
+ private PluginScanResult doLoad(
+ ClassLoader loader,
+ URL[] urls
+ ) throws ReflectiveOperationException {
+ ConfigurationBuilder builder = new ConfigurationBuilder();
+ builder.setClassLoaders(new ClassLoader[]{loader});
+ builder.addUrls(urls);
+ builder.setScanners(new SubTypesScanner());
+ builder.useParallelExecutor();
+ Reflections reflections = new InternalReflections(builder);
+
+ return new PluginScanResult(
+ getPluginWrapper(reflections, SourceConnector.class, loader),
+ getPluginWrapper(reflections, SinkConnector.class, loader),
+ getPluginWrapper(reflections, SourceTask.class, loader),
+ getPluginWrapper(reflections, SinkTask.class, loader),
+ getPluginWrapper(reflections, RecordConverter.class, loader),
+ getTransformationPluginWrapper(loader, reflections)
+ );
+ }
+
+ private Collection<PluginWrapper<Transform<?>>> getTransformationPluginWrapper(ClassLoader loader, Reflections reflections) throws ReflectiveOperationException {
+ return (Collection<PluginWrapper<Transform<?>>>) (Collection<?>) getPluginWrapper(reflections, Transform.class, loader);
+ }
+
+ private <T> Collection<PluginWrapper<T>> getPluginWrapper(
+ Reflections reflections,
+ Class<T> klass,
+ ClassLoader loader
+ ) throws InstantiationException, IllegalAccessException {
+ Set<Class<? extends T>> plugins;
+ try {
+ plugins = reflections.getSubTypesOf(klass);
+ } catch (ReflectionsException e) {
+ log.debug("Reflections scanner could not find any classes for URLs: " +
+ reflections.getConfiguration().getUrls(), e);
+ return Collections.emptyList();
+ }
+
+ Collection<PluginWrapper<T>> result = new ArrayList<>();
+ for (Class<? extends T> plugin : plugins) {
+ if (PluginUtils.isConcrete(plugin)) {
+ result.add(new PluginWrapper<>(plugin, versionFor(plugin), loader));
+ } else {
+ log.debug("Skipping {} as it is not concrete implementation", plugin);
+ }
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> Collection<PluginWrapper<T>> getServiceLoaderPluginWrapper(Class<T> klass, ClassLoader loader) {
+ ClassLoader savedLoader = Plugin.compareAndSwapLoaders(loader);
+ Collection<PluginWrapper<T>> result = new ArrayList<>();
+ try {
+ ServiceLoader<T> serviceLoader = ServiceLoader.load(klass, loader);
+ for (T pluginImpl : serviceLoader) {
+ result.add(new PluginWrapper<>((Class<? extends T>) pluginImpl.getClass(),
+ versionFor(pluginImpl), loader));
+ }
+ } finally {
+ Plugin.compareAndSwapLoaders(savedLoader);
+ }
+ return result;
+ }
+
+ private static <T> String versionFor(T pluginImpl) {
+// return pluginImpl instanceof Versioned ? ((Versioned) pluginImpl).version() : UNDEFINED_VERSION;
+ return UNDEFINED_VERSION;
+ }
+
+ private static <T> String versionFor(Class<? extends T> pluginKlass) throws IllegalAccessException, InstantiationException {
+ // Temporary workaround until all the plugins are versioned.
+ return Connector.class.isAssignableFrom(pluginKlass) ? versionFor(pluginKlass.newInstance()) : UNDEFINED_VERSION;
+ }
+
+ @Override
+ protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ String fullName = aliases.containsKey(name) ? aliases.get(name) : name;
+ PluginClassLoader pluginLoader = pluginClassLoader(fullName);
+ if (pluginLoader != null) {
+ log.trace("Retrieving loaded class '{}' from '{}'", fullName, pluginLoader);
+ return pluginLoader.loadClass(fullName, resolve);
+ }
+
+ return super.loadClass(fullName, resolve);
+ }
+
+ private void addAllAliases() {
+ addAliases(sourceConnectors);
+ addAliases(sinkConnectors);
+ addAliases(sourceTasks);
+ addAliases(sinkTasks);
+ addAliases(converters);
+ addAliases(transformations);
+ }
+
+ private <S> void addAliases(Collection<PluginWrapper<S>> plugins) {
+ for (PluginWrapper<S> plugin : plugins) {
+ if (PluginUtils.isAliasUnique(plugin, plugins)) {
+ String simple = PluginUtils.simpleName(plugin);
+ String pruned = PluginUtils.prunedName(plugin);
+ aliases.put(simple, plugin.className());
+ if (simple.equals(pruned)) {
+ log.info("Added alias '{}' to plugin '{}'", simple, plugin.className());
+ } else {
+ aliases.put(pruned, plugin.className());
+ log.info(
+ "Added aliases '{}' and '{}' to plugin '{}'",
+ simple,
+ pruned,
+ plugin.className()
+ );
+ }
+ }
+ }
+ }
+
+ private static class InternalReflections extends Reflections {
+
+ public InternalReflections(Configuration configuration) {
+ super(configuration);
+ }
+
+ // When Reflections is used for parallel scans, it has a bug where it propagates ReflectionsException
+ // as RuntimeException. Override the scan behavior to emulate the singled-threaded logic.
+ @Override
+ protected void scan(URL url) {
+ try {
+ super.scan(url);
+ } catch (ReflectionsException e) {
+ Logger log = Reflections.log;
+ if (log != null && log.isWarnEnabled()) {
+ log.warn("could not create Vfs.Dir from url. ignoring the exception and continuing", e);
+ }
+ }
+ }
+ }
+
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/Plugin.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/Plugin.java
index 66350da..7841eef 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/Plugin.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/Plugin.java
@@ -19,170 +19,65 @@
import io.openmessaging.connector.api.component.Transform;
import io.openmessaging.connector.api.component.connector.Connector;
import io.openmessaging.connector.api.component.task.Task;
-import java.io.IOException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.data.RecordConverter;
import io.openmessaging.connector.api.errors.ConnectException;
-import org.reflections.Configuration;
-import org.reflections.Reflections;
-import org.reflections.ReflectionsException;
-import org.reflections.scanners.SubTypesScanner;
-import org.reflections.util.ConfigurationBuilder;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class Plugin extends URLClassLoader {
+public class Plugin {
+
+ public enum ClassLoaderUsage {
+ CURRENT_CLASSLOADER,
+ PLUGINS
+ }
+
private static final Logger log = LoggerFactory.getLogger(Plugin.class);
+ private final DelegatingClassLoader delegatingLoader;
- private final List<String> pluginPaths;
-
- private Map<String, PluginWrapper> classLoaderMap = new HashMap<>();
-
- public Plugin(List<String> pluginPaths) {
- this(pluginPaths, Plugin.class.getClassLoader());
+ public Plugin(List<String> pluginLocations) {
+ delegatingLoader = newDelegatingClassLoader(pluginLocations);
+ delegatingLoader.initLoaders();
}
- public Plugin(List<String> pluginPaths, ClassLoader parent) {
- super(new URL[0], parent);
- this.pluginPaths = pluginPaths;
+ public void initLoaders(){
+ delegatingLoader.initLoaders();
}
- public void initPlugin() {
- for (String configPath : pluginPaths) {
- loadPlugin(configPath);
- }
- }
-
- private void loadPlugin(String path) {
- Path pluginPath = Paths.get(path).toAbsolutePath();
- path = pluginPath.toString();
- try {
- if (Files.isDirectory(pluginPath)) {
- for (Path pluginLocation : PluginUtils.pluginLocations(pluginPath)) {
- registerPlugin(pluginLocation);
- }
- } else if (PluginUtils.isArchive(pluginPath)) {
- registerPlugin(pluginPath);
- }
- } catch (IOException e) {
- log.error("register plugin error, path: {}, e: {}", path, e);
- }
- }
-
- private void doLoad(
- ClassLoader loader,
- URL[] urls
- ) {
- ConfigurationBuilder builder = new ConfigurationBuilder();
- builder.setClassLoaders(new ClassLoader[] {loader});
- builder.addUrls(urls);
- builder.setScanners(new SubTypesScanner());
- builder.useParallelExecutor();
- Reflections reflections = new PluginReflections(builder);
- getPlugin(reflections, Connector.class, loader);
- getPlugin(reflections, Task.class, loader);
- getPlugin(reflections, Transform.class, loader);
- }
-
- private <T> Collection<Class<? extends T>> getPlugin(
- Reflections reflections,
- Class<T> klass,
- ClassLoader loader
- ) {
- Set<Class<? extends T>> plugins = reflections.getSubTypesOf(klass);
- Collection<Class<? extends T>> result = new ArrayList<>();
- for (Class<? extends T> plugin : plugins) {
- classLoaderMap.put(plugin.getName(), new PluginWrapper(plugin, loader));
- result.add(plugin);
-
- }
- return result;
- }
-
- private static class PluginReflections extends Reflections {
-
- public PluginReflections(Configuration configuration) {
- super(configuration);
- }
-
- @Override
- protected void scan(URL url) {
- try {
- super.scan(url);
- } catch (ReflectionsException e) {
- Logger log = Reflections.log;
- if (log != null && log.isWarnEnabled()) {
- log.warn("Scan url error. ignoring the exception and continuing", e);
- }
- }
- }
- }
-
- private static PluginClassLoader newPluginClassLoader(
- final URL pluginLocation,
- final URL[] urls,
- final ClassLoader parent
- ) {
+ protected DelegatingClassLoader newDelegatingClassLoader(final List<String> paths) {
return AccessController.doPrivileged(
- (PrivilegedAction<PluginClassLoader>) () -> new PluginClassLoader(pluginLocation, urls, parent)
+ (PrivilegedAction<DelegatingClassLoader>) () -> new DelegatingClassLoader(paths)
);
}
- private void registerPlugin(Path pluginLocation)
- throws IOException {
- log.info("Loading plugin from: {}", pluginLocation);
- List<URL> pluginUrls = new ArrayList<>();
- for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
- pluginUrls.add(path.toUri().toURL());
- }
- URL[] urls = pluginUrls.toArray(new URL[0]);
- if (log.isDebugEnabled()) {
- log.debug("Loading plugin urls: {}", Arrays.toString(urls));
- }
- PluginClassLoader loader = newPluginClassLoader(
- pluginLocation.toUri().toURL(),
- urls,
- this
- );
- doLoad(loader, urls);
+ public DelegatingClassLoader delegatingLoader() {
+ return delegatingLoader;
+ }
+ public Set<PluginWrapper<SinkConnector>> sinkConnectors() {
+ return delegatingLoader.sinkConnectors();
}
- public ClassLoader getPluginClassLoader(String pluginName) {
- PluginWrapper pluginWrapper = classLoaderMap.get(pluginName);
- if (null != pluginWrapper) {
- return pluginWrapper.getClassLoader();
- }
- return null;
+ public Set<PluginWrapper<SourceConnector>> sourceConnectors() {
+ return delegatingLoader.sourceConnectors();
}
- public Task newTask(Class<? extends Task> taskClass) {
- return newPlugin(taskClass);
+ public Set<PluginWrapper<RecordConverter>> converters() {
+ return delegatingLoader.converters();
}
- protected static <T> T newPlugin(Class<T> klass) {
- // KAFKA-8340: The thread classloader is used during static initialization and must be
- // set to the plugin's classloader during instantiation
- ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
- try {
- return klass.getDeclaredConstructor().newInstance();
- } catch (Throwable t) {
- throw new ConnectException("Instantiation error", t);
- } finally {
- compareAndSwapLoaders(savedLoader);
- }
+ public Set<PluginWrapper<Transform<?>>> transformations() {
+ return delegatingLoader.transformations();
}
public ClassLoader currentThreadLoader() {
@@ -191,10 +86,183 @@
public static ClassLoader compareAndSwapLoaders(ClassLoader loader) {
ClassLoader current = Thread.currentThread().getContextClassLoader();
- if (null != current && !current.equals(loader)) {
+ if (!current.equals(loader)) {
Thread.currentThread().setContextClassLoader(loader);
}
return current;
}
+ protected static <U> Class<? extends U> pluginClass(
+ DelegatingClassLoader loader,
+ String classOrAlias,
+ Class<U> pluginClass
+ ) throws ClassNotFoundException {
+ Class<?> klass = loader.loadClass(classOrAlias, false);
+ if (pluginClass.isAssignableFrom(klass)) {
+ return (Class<? extends U>) klass;
+ }
+
+ throw new ClassNotFoundException(
+ "Requested class: "
+ + classOrAlias
+ + " does not extend " + pluginClass.getSimpleName()
+ );
+ }
+
+ public Connector newConnector(String connectorClassOrAlias) {
+ Class<? extends Connector> klass = connectorClass(connectorClassOrAlias);
+ return newPlugin(klass);
+ }
+
+ public Class<? extends Connector> connectorClass(String connectorClassOrAlias) {
+ Class<? extends Connector> klass;
+ try {
+ klass = pluginClass(delegatingLoader, connectorClassOrAlias, Connector.class);
+ } catch (ClassNotFoundException e) {
+ List<PluginWrapper<? extends Connector>> matches = new ArrayList<>();
+ Set<PluginWrapper<Connector>> connectors = delegatingLoader.connectors();
+ for (PluginWrapper<? extends Connector> plugin : connectors) {
+ Class<?> pluginClass = plugin.pluginClass();
+ String simpleName = pluginClass.getSimpleName();
+ if (simpleName.equals(connectorClassOrAlias)
+ || simpleName.equals(connectorClassOrAlias + "Connector")) {
+ matches.add(plugin);
+ }
+ }
+ if (matches.isEmpty()) {
+ throw new ConnectException(
+ "Failed to find any class that implements Connector and which name matches "
+ + connectorClassOrAlias
+ + ", available connectors are: "
+ + Utils.join(connectors, ", ")
+ );
+ }
+
+ // conflict connector
+ if (matches.size() > 1) {
+ throw new ConnectException(
+ "More than one connector matches alias "
+ + connectorClassOrAlias
+ + ". Please use full package and class name instead. Classes found: "
+ + Utils.join(connectors, ", ")
+ );
+ }
+
+ PluginWrapper<? extends Connector> entry = matches.get(0);
+ klass = entry.pluginClass();
+ }
+ return klass;
+ }
+
+ public Task newTask(Class<? extends Task> taskClass) {
+ return newPlugin(taskClass);
+ }
+
+ public RecordConverter newConverter(ConnectKeyValue config, String classPropertyName, ClassLoaderUsage classLoaderUsage) {
+ if (!config.containsKey(classPropertyName)) {
+ return null;
+ }
+ Class<? extends RecordConverter> klass = null;
+ switch (classLoaderUsage) {
+ case CURRENT_CLASSLOADER:
+ klass = pluginClassFromConfig(config, classPropertyName, RecordConverter.class, delegatingLoader.converters());
+ break;
+ case PLUGINS:
+ String converterClassOrAlias = Utils.getClass(config,classPropertyName).getName();
+ try {
+ klass = pluginClass(delegatingLoader, converterClassOrAlias, RecordConverter.class);
+ } catch (ClassNotFoundException e) {
+ throw new ConnectException(
+ "Failed to find any class that implements Converter and which name matches "
+ + converterClassOrAlias + ", available converters are: "
+ + pluginNames(delegatingLoader.converters())
+ );
+ }
+ break;
+ }
+ if (klass == null) {
+ throw new ConnectException("Unable to initialize the Converter specified in '" + classPropertyName + "'");
+ }
+
+ // Configure the Converter using only the old configuration mechanism ...
+ String configPrefix = classPropertyName + ".";
+ Map<String, String> converterConfig = config.originalsWithPrefix(configPrefix);
+ log.debug("Configuring the converter with configuration keys:{}{}", System.lineSeparator(), converterConfig.keySet());
+
+ RecordConverter plugin;
+ ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
+ try {
+ plugin = newPlugin(klass);
+ plugin.configure(converterConfig);
+ } finally {
+ compareAndSwapLoaders(savedLoader);
+ }
+ return plugin;
+ }
+
+
+ public <T> List<T> newPlugins(List<String> klassNames, ConnectKeyValue config, Class<T> pluginKlass) {
+ List<T> plugins = new ArrayList<>();
+ if (klassNames != null) {
+ for (String klassName : klassNames) {
+ plugins.add(newPlugin(klassName, config, pluginKlass));
+ }
+ }
+ return plugins;
+ }
+
+ protected static <T> T newPlugin(Class<T> klass) {
+ ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
+ try {
+ return Utils.newInstance(klass);
+ } catch (Throwable t) {
+ throw new ConnectException("Instantiation error", t);
+ } finally {
+ compareAndSwapLoaders(savedLoader);
+ }
+ }
+
+
+ public <T> T newPlugin(String klassName, ConnectKeyValue config, Class<T> pluginKlass) {
+ T plugin;
+ Class<? extends T> klass;
+ try {
+ klass = pluginClass(delegatingLoader, klassName, pluginKlass);
+ } catch (ClassNotFoundException e) {
+ String msg = String.format("Failed to find any class that implements %s and which "
+ + "name matches %s", pluginKlass, klassName);
+ throw new ConnectException(msg);
+ }
+ ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
+ try {
+ plugin = newPlugin(klass);
+ } finally {
+ compareAndSwapLoaders(savedLoader);
+ }
+ return plugin;
+ }
+
+ private static <T> String pluginNames(Collection<PluginWrapper<T>> plugins) {
+ return Utils.join(plugins, ", ");
+ }
+
+
+ protected <U> Class<? extends U> pluginClassFromConfig(
+ ConnectKeyValue config,
+ String propertyName,
+ Class<U> pluginClass,
+ Collection<PluginWrapper<U>> plugins
+ ) {
+ Class<?> klass = Utils.getClass(config, propertyName);
+ if (pluginClass.isAssignableFrom(klass)) {
+ return (Class<? extends U>) klass;
+ }
+ throw new ConnectException(
+ "Failed to find any class that implements " + pluginClass.getSimpleName()
+ + " for the config "
+ + propertyName + ", available classes are: "
+ + pluginNames(plugins)
+ );
+ }
+
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginClassLoader.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginClassLoader.java
index c5419b4..6924e7a 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginClassLoader.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginClassLoader.java
@@ -21,6 +21,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * A custom classloader dedicated to loading Connect plugin classes in classloading isolation.
+ */
public class PluginClassLoader extends URLClassLoader {
private static final Logger log = LoggerFactory.getLogger(PluginClassLoader.class);
private final URL pluginLocation;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginScanResult.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginScanResult.java
new file mode 100644
index 0000000..ad7868d
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginScanResult.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.controller.isolation;
+
+
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.RecordConverter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+
+/**
+ * plugin scan result
+ */
+public class PluginScanResult {
+ private final Collection<PluginWrapper<SourceConnector>> sourceConnectors;
+ private final Collection<PluginWrapper<SinkConnector>> sinkConnectors;
+ private final Collection<PluginWrapper<SourceTask>> sourceTasks;
+ private final Collection<PluginWrapper<SinkTask>> sinkTasks;
+ private final Collection<PluginWrapper<RecordConverter>> converters;
+ private final Collection<PluginWrapper<Transform<?>>> transformations;
+
+ private final List<Collection> allPlugins;
+
+ public PluginScanResult(
+ Collection<PluginWrapper<SourceConnector>> sourceConnectors,
+ Collection<PluginWrapper<SinkConnector>> sinkConnectors,
+ Collection<PluginWrapper<SourceTask>> sourceTasks,
+ Collection<PluginWrapper<SinkTask>> sinkTasks,
+ Collection<PluginWrapper<RecordConverter>> converters,
+ Collection<PluginWrapper<Transform<?>>> transformations
+ ) {
+ this.sinkConnectors = sinkConnectors;
+ this.sourceConnectors = sourceConnectors;
+ this.sourceTasks = sourceTasks;
+ this.sinkTasks = sinkTasks;
+ this.converters = converters;
+ this.transformations = transformations;
+ this.allPlugins =
+ Arrays.asList(sinkConnectors, sourceConnectors, sourceTasks, sinkTasks, converters, transformations);
+ }
+
+ public Collection<PluginWrapper<SinkConnector>> sinkConnectors() {
+ return sinkConnectors;
+ }
+
+ public Collection<PluginWrapper<SourceConnector>> sourceConnectors() {
+ return sourceConnectors;
+ }
+
+ public Collection<PluginWrapper<SourceTask>> sourceTasks() {
+ return sourceTasks;
+ }
+
+ public Collection<PluginWrapper<SinkTask>> sinkTasks() {
+ return sinkTasks;
+ }
+
+
+ public Collection<PluginWrapper<RecordConverter>> converters() {
+ return converters;
+ }
+
+
+ public Collection<PluginWrapper<Transform<?>>> transformations() {
+ return transformations;
+ }
+
+ public boolean isEmpty() {
+ boolean isEmpty = true;
+ for (Collection plugins : allPlugins) {
+ isEmpty = isEmpty && plugins.isEmpty();
+ }
+ return isEmpty;
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginType.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginType.java
new file mode 100644
index 0000000..ede4607
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginType.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.controller.isolation;
+
+import io.openmessaging.connector.api.component.Transform;
+import io.openmessaging.connector.api.component.connector.Connector;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.data.RecordConverter;
+
+import java.util.Locale;
+
+/**
+ * plugin type
+ */
+public enum PluginType {
+ SOURCE(SourceConnector.class),
+ SINK(SinkConnector.class),
+ CONNECTOR(Connector.class),
+ CONVERTER(RecordConverter.class),
+ TRANSFORMATION(Transform.class),
+ UNKNOWN(Object.class);
+
+ private Class<?> klass;
+
+ PluginType(Class<?> klass) {
+ this.klass = klass;
+ }
+
+ public static PluginType from(Class<?> klass) {
+ for (PluginType type : PluginType.values()) {
+ if (type.klass.isAssignableFrom(klass)) {
+ return type;
+ }
+ }
+ return UNKNOWN;
+ }
+
+ public String simpleName() {
+ return klass.getSimpleName();
+ }
+
+ @Override
+ public String toString() {
+ return super.toString().toLowerCase(Locale.ROOT);
+ }
+}
\ No newline at end of file
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
index d46a7e1..4a804ca 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
@@ -17,11 +17,13 @@
package org.apache.rocketmq.connect.runtime.controller.isolation;
import java.io.IOException;
+import java.lang.reflect.Modifier;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -120,6 +122,16 @@
+ ")\\..*$"
+ "|io\\.openmessaging\\.KeyValue");
+
+ // If the base interface or class that will be used to identify Connect plugins resides within
+ // the same java package as the plugins that need to be loaded in isolation (and thus are
+ // added to the INCLUDE pattern), then this base interface or class needs to be excluded in the
+ // regular expression pattern
+ private static final Pattern INCLUDE = Pattern.compile("^(?:"
+ + "|org\\.apache\\.rocketmq\\.connect"
+ + ")\\..*$");
+
+
private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new DirectoryStream
.Filter<Path>() {
@Override
@@ -152,6 +164,83 @@
return locations;
}
+ /**
+ * Verify the given class corresponds to a concrete class and not to an abstract class or
+ */
+ public static boolean isConcrete(Class<?> klass) {
+ int mod = klass.getModifiers();
+ return !Modifier.isAbstract(mod) && !Modifier.isInterface(mod);
+ }
+
+ /**
+ * Return whether the class with the given name should be loaded in isolation using a plugin
+ * classloader.
+ *
+ * @param name the fully qualified name of the class.
+ * @return true if this class should be loaded in isolation, false otherwise.
+ */
+ public static boolean shouldLoadInIsolation(String name) {
+ return !(BLACKLIST.matcher(name).matches() && !INCLUDE.matcher(name).matches());
+ }
+ /**
+ * Verify whether a given plugin's alias matches another alias in a collection of plugins.
+ *
+ * @param alias the plugin descriptor to test for alias matching.
+ * @param plugins the collection of plugins to test against.
+ * @param <U> the plugin type.
+ * @return false if a match was found in the collection, otherwise true.
+ */
+ public static <U> boolean isAliasUnique(
+ PluginWrapper<U> alias,
+ Collection<PluginWrapper<U>> plugins
+ ) {
+ boolean matched = false;
+ for (PluginWrapper<U> plugin : plugins) {
+ if (simpleName(alias).equals(simpleName(plugin))
+ || prunedName(alias).equals(prunedName(plugin))) {
+ if (matched) {
+ return false;
+ }
+ matched = true;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Return the simple class name of a plugin as {@code String}.
+ *
+ * @param plugin the plugin descriptor.
+ * @return the plugin's simple class name.
+ */
+ public static String simpleName(PluginWrapper<?> plugin) {
+ return plugin.pluginClass().getSimpleName();
+ }
+
+ /**
+ * Remove the plugin type name at the end of a plugin class name, if such suffix is present.
+ * This method is meant to be used to extract plugin aliases.
+ */
+ public static String prunedName(PluginWrapper<?> plugin) {
+ switch (plugin.type()) {
+ case SOURCE:
+ case SINK:
+ case CONNECTOR:
+ return prunePluginName(plugin, "Connector");
+ default:
+ return prunePluginName(plugin, plugin.type().simpleName());
+ }
+ }
+
+ private static String prunePluginName(PluginWrapper<?> plugin, String suffix) {
+ String simple = plugin.pluginClass().getSimpleName();
+ int pos = simple.lastIndexOf(suffix);
+ if (pos > 0) {
+ return simple.substring(0, pos);
+ }
+ return simple;
+ }
+
public static List<Path> pluginUrls(Path topPath) throws IOException {
boolean containsClassFiles = false;
Set<Path> archives = new TreeSet<>();
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginWrapper.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginWrapper.java
index 070f6ce..0e713fc 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginWrapper.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginWrapper.java
@@ -16,19 +16,26 @@
*/
package org.apache.rocketmq.connect.runtime.controller.isolation;
-import io.openmessaging.connector.api.component.task.sink.SinkConnector;
-import io.openmessaging.connector.api.component.task.source.SourceConnector;
-import java.util.Locale;
-public class PluginWrapper<T> {
+import org.apache.maven.artifact.versioning.DefaultArtifactVersion;
+
+import java.util.Objects;
+
+/**
+ * @param <T>
+ */
+public class PluginWrapper<T> implements Comparable<PluginWrapper<T>>{
private final Class<? extends T> klass;
private final String name;
private final PluginType type;
private final String typeName;
private final String location;
+ private final String version;
+ private final DefaultArtifactVersion encodedVersion;
+
private final ClassLoader classLoader;
- public PluginWrapper(Class<? extends T> klass, ClassLoader loader) {
+ public PluginWrapper(Class<? extends T> klass, String version, ClassLoader loader) {
this.klass = klass;
this.name = klass.getName();
this.type = PluginType.from(klass);
@@ -37,39 +44,60 @@
this.location = loader instanceof PluginClassLoader
? ((PluginClassLoader) loader).location()
: "classpath";
+ this.version = version != null ? version : "null";
+ this.encodedVersion = new DefaultArtifactVersion(this.version);
}
public ClassLoader getClassLoader() {
return this.classLoader;
}
- public enum PluginType {
- SOURCE(SourceConnector.class),
- SINK(SinkConnector.class),
- UNKNOWN(Object.class);
+ public Class<? extends T> pluginClass() {
+ return klass;
+ }
- private Class<?> klass;
+ public String className() {
+ return name;
+ }
- PluginType(Class<?> klass) {
- this.klass = klass;
+ public String version() {
+ return version;
+ }
+
+ public PluginType type() {
+ return type;
+ }
+
+ public String typeName() {
+ return typeName;
+ }
+
+ public String location() {
+ return location;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
}
-
- public static PluginType from(Class<?> klass) {
- for (PluginType type : PluginType.values()) {
- if (type.klass.isAssignableFrom(klass)) {
- return type;
- }
- }
- return UNKNOWN;
+ if (!(o instanceof PluginWrapper)) {
+ return false;
}
+ PluginWrapper<?> that = (PluginWrapper<?>) o;
+ return Objects.equals(klass, that.klass) &&
+ Objects.equals(version, that.version) &&
+ type == that.type;
+ }
- public String simpleName() {
- return klass.getSimpleName();
- }
+ @Override
+ public int hashCode() {
+ return Objects.hash(klass, version, type);
+ }
- @Override
- public String toString() {
- return super.toString().toLowerCase(Locale.ROOT);
- }
+ @Override
+ public int compareTo(PluginWrapper other) {
+ int nameComp = name.compareTo(other.name);
+ return nameComp != 0 ? nameComp : encodedVersion.compareTo(other.encodedVersion);
}
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContext.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContext.java
index 644f024..cc75862 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContext.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/errors/ProcessingContext.java
@@ -71,8 +71,6 @@
}
/**
- * Set the record consumed from Kafka in a sink connector.
- *
* @param consumedMessage the record
*/
public void consumerRecord(MessageExt consumedMessage) {
@@ -80,9 +78,7 @@
reset();
}
- /**
- * @return the record consumed from Kafka. could be null
- */
+
public MessageExt consumerRecord() {
return consumedMessage;
}
@@ -96,7 +92,6 @@
/**
* Set the source record being processed in the connect pipeline.
- *
* @param record the source record
*/
public void sourceRecord(ConnectRecord record) {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/ConnectorPluginsResource.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/ConnectorPluginsResource.java
new file mode 100644
index 0000000..fd2c13e
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/ConnectorPluginsResource.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.rest;
+
+import com.alibaba.fastjson.JSON;
+import io.javalin.Context;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginType;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginWrapper;
+import org.apache.rocketmq.connect.runtime.rest.entities.ConfigKeyInfo;
+import org.apache.rocketmq.connect.runtime.rest.entities.PluginInfo;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class ConnectorPluginsResource {
+
+ private final AbstractConnectController connectController;
+ private final List<PluginInfo> connectorPlugins;
+
+ static final List<Class<? extends SinkConnector>> SINK_CONNECTOR_EXCLUDES = Arrays.asList();
+ static final List<Class<? extends SourceConnector>> SOURCE_CONNECTOR_EXCLUDES = Arrays.asList();
+
+
+ public ConnectorPluginsResource(AbstractConnectController connectController) {
+ this.connectController = connectController;
+ this.connectorPlugins = new ArrayList<>();
+
+ // TODO: improve once plugins are allowed to be added/removed during runtime.
+ addConnectorPlugins(connectController.plugin().sinkConnectors(), SINK_CONNECTOR_EXCLUDES);
+ addConnectorPlugins(connectController.plugin().sourceConnectors(), SOURCE_CONNECTOR_EXCLUDES);
+ addConnectorPlugins(connectController.plugin().transformations(), new ArrayList<>());
+ addConnectorPlugins(connectController.plugin().converters(), Collections.emptySet());
+ }
+
+ private <T> void addConnectorPlugins(Collection<PluginWrapper<T>> plugins, Collection<Class<? extends T>> excludes) {
+ plugins.stream()
+ .filter(p -> !excludes.contains(p.pluginClass()))
+ .map(PluginInfo::new)
+ .forEach(connectorPlugins::add);
+ }
+
+
+ /**
+ * validate plugin configs
+ * @param context
+ * @throws Throwable
+ */
+ public void validateConfigs(Context context) {
+ // No-op
+ }
+
+ /**
+ * list connector plugins
+ * @param context
+ * @return
+ */
+ public void listConnectorPlugins(Context context) {
+ boolean connectorsOnly = context.anyFormParamNull("connectorsOnly")
+ ? false : Boolean.parseBoolean(context.pathParam("connectorsOnly")) ;
+ synchronized (this) {
+ if (connectorsOnly) {
+ List<PluginInfo> pluginInfos = Collections.unmodifiableList(connectorPlugins.stream()
+ .filter(p -> PluginType.SINK.toString().equals(p.getType()) || PluginType.SOURCE.toString().equals(p.getType()))
+ .collect(Collectors.toList()));
+ context.result(JSON.toJSONString(pluginInfos));
+ } else {
+ context.result(JSON.toJSONString(Collections.unmodifiableList(connectorPlugins)));
+ }
+ }
+ }
+
+ /**
+ * Get connector config def
+ * @param context
+ * @return
+ */
+ public List<ConfigKeyInfo> getConnectorConfigDef(Context context) {
+ // No-op
+ return Collections.emptyList();
+ }
+
+ /**
+ * reload plugins
+ * @param context
+ */
+ public void reloadPlugins(Context context) {
+ connectController.getConfigManagementService().getPlugin().initLoaders();
+ context.result("success");
+ }
+}
+
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
index 1eda51f..c49d5a7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java
@@ -47,8 +47,12 @@
private static final String TASK_CONFIGS = "taskConfigs";
+ /** connector plugin resource */
+ private ConnectorPluginsResource pluginsResource;
public RestHandler(AbstractConnectController connectController) {
this.connectController = connectController;
+ pluginsResource = new ConnectorPluginsResource(connectController);
+
Javalin app = Javalin.create();
app.enableCaseSensitiveUrls();
app = app.start(connectController.getConnectConfig().getHttpPort());
@@ -70,7 +74,13 @@
app.get("/getConfigInfo", this::getConfigInfo);
app.get("/getAllocatedConnectors", this::getAllocatedConnectors);
app.get("/getAllocatedTasks", this::getAllocatedTasks);
- app.get("/plugin/reload", this::reloadPlugins);
+
+ /**plugin resource handler*/
+ app.get("/plugin/reload", context -> pluginsResource.reloadPlugins(context));
+ app.get("/plugin/list", context -> pluginsResource.listConnectorPlugins(context));
+ app.get("/plugin/config", context -> pluginsResource.getConnectorConfigDef(context));
+ app.get("/plugin/config/validate", context -> pluginsResource.validateConfigs(context));
+
}
@@ -244,8 +254,4 @@
return result;
}
- private void reloadPlugins(Context context) {
- connectController.getConfigManagementService().getPlugin().initPlugin();
- context.result("success");
- }
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/ConfigKeyInfo.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/ConfigKeyInfo.java
new file mode 100644
index 0000000..851ec47
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/ConfigKeyInfo.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.rest.entities;
+
+
+import java.util.List;
+import java.util.Objects;
+
+public class ConfigKeyInfo {
+
+ private String name;
+ private String type;
+ private boolean required;
+ private String defaultValue;
+ private String importance;
+ private String documentation;
+ private String group;
+ private int orderInGroup;
+ private String width;
+ private String displayName;
+ private List<String> dependents;
+
+ public ConfigKeyInfo(String name,
+ String type,
+ boolean required,
+ String defaultValue,
+ String importance,
+ String documentation,
+ String group,
+ int orderInGroup,
+ String width,
+ String displayName,
+ List<String> dependents) {
+ this.name = name;
+ this.type = type;
+ this.required = required;
+ this.defaultValue = defaultValue;
+ this.importance = importance;
+ this.documentation = documentation;
+ this.group = group;
+ this.orderInGroup = orderInGroup;
+ this.width = width;
+ this.displayName = displayName;
+ this.dependents = dependents;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public boolean isRequired() {
+ return required;
+ }
+
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+
+ public String getImportance() {
+ return importance;
+ }
+
+ public String getDocumentation() {
+ return documentation;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public int getOrderInGroup() {
+ return orderInGroup;
+ }
+
+ public String getWidth() {
+ return width;
+ }
+
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ public List<String> getDependents() {
+ return dependents;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ConfigKeyInfo that = (ConfigKeyInfo) o;
+ return Objects.equals(name, that.name) &&
+ Objects.equals(type, that.type) &&
+ Objects.equals(required, that.required) &&
+ Objects.equals(defaultValue, that.defaultValue) &&
+ Objects.equals(importance, that.importance) &&
+ Objects.equals(documentation, that.documentation) &&
+ Objects.equals(group, that.group) &&
+ Objects.equals(orderInGroup, that.orderInGroup) &&
+ Objects.equals(width, that.width) &&
+ Objects.equals(displayName, that.displayName) &&
+ Objects.equals(dependents, that.dependents);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, type, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("[")
+ .append(name)
+ .append(",")
+ .append(type)
+ .append(",")
+ .append(required)
+ .append(",")
+ .append(defaultValue)
+ .append(",")
+ .append(importance)
+ .append(",")
+ .append(documentation)
+ .append(",")
+ .append(group)
+ .append(",")
+ .append(orderInGroup)
+ .append(",")
+ .append(width)
+ .append(",")
+ .append(displayName)
+ .append(",")
+ .append(dependents)
+ .append("]");
+ return sb.toString();
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/PluginInfo.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/PluginInfo.java
new file mode 100644
index 0000000..007b97c
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/entities/PluginInfo.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.rest.entities;
+
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginType;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginWrapper;
+
+import java.util.Objects;
+
+/**
+ * plugin info
+ */
+public class PluginInfo {
+ private String className;
+ private PluginType type;
+ private String version;
+
+ public PluginInfo(String className, PluginType type, String version) {
+ this.className = className;
+ this.type = type;
+ this.version = version;
+ }
+
+ public PluginInfo(PluginWrapper<?> plugin) {
+ this(plugin.className(), plugin.type(), plugin.version());
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public void setClassName(String className) {
+ this.className = className;
+ }
+
+ public PluginType getType() {
+ return type;
+ }
+
+ public void setType(PluginType type) {
+ this.type = type;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof PluginInfo)) return false;
+ PluginInfo that = (PluginInfo) o;
+ return Objects.equals(className, that.className) && type == that.type && Objects.equals(version, that.version);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(className, type, version);
+ }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
index 8792593..346d5d0 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementService.java
@@ -25,8 +25,6 @@
*/
public interface ClusterManagementService {
- Long WORKER_TIME_OUT = 30 * 1000L;
-
void initialize(ConnectConfig connectConfig);
/**
@@ -69,8 +67,16 @@
*/
void registerListener(WorkerStatusListener listener);
+ /**
+ * get current run worker
+ * @return
+ */
String getCurrentWorker();
+ /**
+ * staging mode
+ * @return
+ */
StagingMode getStagingMode();
interface WorkerStatusListener {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index a37fea0..6496c97 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -177,7 +177,7 @@
}
String connectorClass = configs.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
- ClassLoader classLoader = plugin.getPluginClassLoader(connectorClass);
+ ClassLoader classLoader = plugin.delegatingLoader().pluginClassLoader(connectorClass);
Class clazz;
if (null != classLoader) {
clazz = Class.forName(connectorClass, true, classLoader);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
index 06256ad..f5a8c0f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
@@ -18,6 +18,7 @@
import io.openmessaging.connector.api.component.connector.Connector;
+import io.openmessaging.connector.api.errors.ConnectException;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
@@ -27,6 +28,7 @@
import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
import org.apache.rocketmq.connect.runtime.store.MemoryBasedKeyValueStore;
import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,19 +131,22 @@
}
}
+ ClassLoader savedLoader = plugin.currentThreadLoader();
String connectorClass = configs.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
- ClassLoader classLoader = plugin.getPluginClassLoader(connectorClass);
- Class clazz;
- if (null != classLoader) {
- clazz = Class.forName(connectorClass, true, classLoader);
- } else {
- clazz = Class.forName(connectorClass);
+ ClassLoader connectLoader = plugin.delegatingLoader().pluginClassLoader(connectorClass);
+ savedLoader = Plugin.compareAndSwapLoaders(connectLoader);
+ try {
+ Class clazz = Utils.getContextCurrentClassLoader().loadClass(connectorClass);
+ final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
+ connector.validate(configs);
+ connector.start(configs);
+ connectorKeyValueStore.put(connectorName, configs);
+ recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
+ } catch (Exception ex) {
+ throw new ConnectException(ex);
+ } finally {
+ Plugin.compareAndSwapLoaders(savedLoader);
}
- final Connector connector = (Connector) clazz.getDeclaredConstructor().newInstance();
- connector.validate(configs);
- connector.start(configs);
- connectorKeyValueStore.put(connectorName, configs);
- recomputeTaskConfigs(connectorName, connector, currentTimestamp, configs);
return "";
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
index 79f52c5..5aea5f1 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Utils.java
@@ -17,9 +17,17 @@
package org.apache.rocketmq.connect.runtime.utils;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+
/**
* common utils
*/
@@ -52,4 +60,83 @@
}
}
+ /**
+ * Instantiate the class
+ */
+ public static <T> T newInstance(Class<T> c) {
+ if (c == null) {
+ throw new ConnectException("class cannot be null");
+ }
+ try {
+ return c.getDeclaredConstructor().newInstance();
+ } catch (NoSuchMethodException e) {
+ throw new ConnectException("Could not find a public no-argument constructor for " + c.getName(), e);
+ } catch (ReflectiveOperationException | RuntimeException e) {
+ throw new ConnectException("Could not instantiate class " + c.getName(), e);
+ }
+ }
+
+ public static <T> String join(T[] strs, String separator) {
+ return join(Arrays.asList(strs), separator);
+ }
+
+ public static <T> String join(Collection<T> collection, String separator) {
+ Objects.requireNonNull(collection);
+ StringBuilder sb = new StringBuilder();
+ Iterator<T> iter = collection.iterator();
+ while (iter.hasNext()) {
+ sb.append(iter.next());
+ if (iter.hasNext()) {
+ sb.append(separator);
+ }
+ }
+ return sb.toString();
+ }
+
+ /**
+ * Look up a class by name.
+ */
+ public static <T> Class<? extends T> loadClass(String klass, Class<T> base) throws ClassNotFoundException {
+ return Class.forName(klass, true, Utils.getContextCurrentClassLoader()).asSubclass(base);
+ }
+
+ /**
+ * Get current classLoader
+ */
+ public static ClassLoader getCurrentClassLoader() {
+ return Utils.class.getClassLoader();
+ }
+
+
+ /**
+ * get context current class loader
+ * @return
+ */
+ public static ClassLoader getContextCurrentClassLoader() {
+ // use thread classloader first
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ if (cl == null) {
+ return getCurrentClassLoader();
+ }
+ return cl;
+
+ }
+
+ /**
+ * get class
+ * @param key
+ * @return
+ */
+ public static Class<?> getClass(ConnectKeyValue config,final String key) {
+ if (!config.containsKey(key) || StringUtils.isEmpty(config.getString(key))) {
+ throw new ConnectException("");
+ }
+ ClassLoader contextCurrentClassLoader = Utils.getContextCurrentClassLoader();
+ try {
+ Class<?> klass = contextCurrentClassLoader.loadClass(config.getString(key).trim());
+ return Class.forName(klass.getName(), true, contextCurrentClassLoader);
+ } catch (ClassNotFoundException e) {
+ throw new ConnectException("Expected a Class instance or class name. key ["+ key+"], value ["+config.getString(key)+"]");
+ }
+ }
}