blob: 8fffecddde1c3acb18860f31d865dee568cf1cd6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.plugin.discovery;
import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File;
import java.io.FileFilter;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractPluginDiscovery.class);
private final Path pluginDir;
/**
* Add jar url to classloader. The different engine should have different logic to add url into
* their own classloader
*/
private BiConsumer<ClassLoader, URL> addURLToClassLoader = (classLoader, url) -> {
if (classLoader instanceof URLClassLoader) {
ReflectionUtils.invoke(classLoader, "addURL", url);
} else {
throw new UnsupportedOperationException("can't support custom load jar");
}
};
protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>> pluginJarPath =
new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
public AbstractPluginDiscovery(String pluginSubDir, BiConsumer<ClassLoader, URL> addURLToClassloader) {
this.pluginDir = Common.connectorJarDir(pluginSubDir);
this.addURLToClassLoader = addURLToClassloader;
LOGGER.info("Load {} Plugin from {}", getPluginBaseClass().getSimpleName(), pluginDir);
}
public AbstractPluginDiscovery(String pluginSubDir) {
this.pluginDir = Common.connectorJarDir(pluginSubDir);
LOGGER.info("Load {} Plugin from {}", getPluginBaseClass().getSimpleName(), pluginDir);
}
@Override
public List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers) {
return pluginIdentifiers.stream()
.map(this::getPluginJarPath)
.filter(Optional::isPresent)
.map(Optional::get).distinct()
.collect(Collectors.toList());
}
@Override
public List<T> getAllPlugins(List<PluginIdentifier> pluginIdentifiers) {
return pluginIdentifiers.stream()
.map(this::createPluginInstance).distinct()
.collect(Collectors.toList());
}
@Override
public T createPluginInstance(PluginIdentifier pluginIdentifier) {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
T pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
if (pluginInstance != null) {
LOGGER.info("Load plugin: {} from classpath", pluginIdentifier);
return pluginInstance;
}
Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
// if the plugin jar not exist in classpath, will load from plugin dir.
if (pluginJarPath.isPresent()) {
try {
// use current thread classloader to avoid different classloader load same class error.
this.addURLToClassLoader.accept(classLoader, pluginJarPath.get());
} catch (Exception e) {
LOGGER.warn("can't load jar use current thread classloader, use URLClassLoader instead now." +
" message: " + e.getMessage());
classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
}
pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
if (pluginInstance != null) {
LOGGER.info("Load plugin: {} from path: {} use classloader: {}",
pluginIdentifier, pluginJarPath.get(), classLoader.getClass().getName());
return pluginInstance;
}
}
throw new RuntimeException("Plugin " + pluginIdentifier + " not found.");
}
@Nullable
private T loadPluginInstance(PluginIdentifier pluginIdentifier, ClassLoader classLoader) {
ServiceLoader<T> serviceLoader = ServiceLoader.load(getPluginBaseClass(), classLoader);
for (T t : serviceLoader) {
if (t instanceof Plugin) {
// old api
Plugin<?> pluginInstance = (Plugin<?>) t;
if (StringUtils.equalsIgnoreCase(pluginInstance.getPluginName(), pluginIdentifier.getPluginName())) {
return (T) pluginInstance;
}
} else if (t instanceof PluginIdentifierInterface) {
// new api
PluginIdentifierInterface pluginIdentifierInstance = (PluginIdentifierInterface) t;
if (StringUtils.equalsIgnoreCase(pluginIdentifierInstance.getPluginName(), pluginIdentifier.getPluginName())) {
return (T) pluginIdentifierInstance;
}
} else {
throw new UnsupportedOperationException("Plugin instance: " + t + " is not supported.");
}
}
return null;
}
/**
* Get the plugin instance.
*
* @param pluginIdentifier plugin identifier.
* @return plugin instance.
*/
protected Optional<URL> getPluginJarPath(PluginIdentifier pluginIdentifier) {
return pluginJarPath.computeIfAbsent(pluginIdentifier, this::findPluginJarPath);
}
/**
* Get spark plugin interface.
*
* @return plugin base class.
*/
protected abstract Class<T> getPluginBaseClass();
/**
* Find the plugin jar path;
*
* @param pluginIdentifier plugin identifier.
* @return plugin jar path.
*/
private Optional<URL> findPluginJarPath(PluginIdentifier pluginIdentifier) {
if (PLUGIN_JAR_MAPPING.isEmpty()) {
return Optional.empty();
}
final String engineType = pluginIdentifier.getEngineType().toLowerCase();
final String pluginType = pluginIdentifier.getPluginType().toLowerCase();
final String pluginName = pluginIdentifier.getPluginName().toLowerCase();
if (!PLUGIN_JAR_MAPPING.hasPath(engineType)) {
return Optional.empty();
}
Config engineConfig = PLUGIN_JAR_MAPPING.getConfig(engineType);
if (!engineConfig.hasPath(pluginType)) {
return Optional.empty();
}
Config typeConfig = engineConfig.getConfig(pluginType);
Optional<Map.Entry<String, ConfigValue>> optional = typeConfig.entrySet().stream()
.filter(entry -> StringUtils.equalsIgnoreCase(entry.getKey(), pluginName))
.findFirst();
if (!optional.isPresent()) {
return Optional.empty();
}
String pluginJarPrefix = optional.get().getValue().unwrapped().toString();
File[] targetPluginFiles = pluginDir.toFile().listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.getName().endsWith(".jar") && StringUtils.startsWithIgnoreCase(pathname.getName(), pluginJarPrefix);
}
});
if (ArrayUtils.isEmpty(targetPluginFiles)) {
return Optional.empty();
}
try {
URL pluginJarPath = targetPluginFiles[0].toURI().toURL();
LOGGER.info("Discovery plugin jar: {} at: {}", pluginIdentifier.getPluginName(), pluginJarPath);
return Optional.of(pluginJarPath);
} catch (MalformedURLException e) {
LOGGER.warn("Cannot get plugin URL: " + targetPluginFiles[0], e);
return Optional.empty();
}
}
}