Show pipe plugin loading errors in plugin listing (#17487)
* Show pipe plugin loading errors in plugin listing
Add plugin loading exception messages to pipe plugin metadata and expose them through SHOW PIPEPLUGINS and information_schema.pipe_plugins, so users can diagnose initialization and class loading failures directly from query results.
Made-with: Cursor
* spotless
* spotless
* fix
* fix
* fix
* fix
* fix
* fix
* fix
* fix
* spotless
* spotless
* update
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
index 87bc7b0..0c2f3bc 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
@@ -25,6 +25,8 @@
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
+import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic;
@@ -39,14 +41,21 @@
import org.junit.runner.RunWith;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import static org.junit.Assert.fail;
@@ -884,4 +893,96 @@
fail(e.getMessage());
}
}
+
+ @Test
+ public void testShowPipePluginAfterJarDeletedAndClusterRestart() throws Exception {
+ final String pluginName = "TEST_MISSING_JAR_PROCESSOR";
+ final String pluginClassName = "org.apache.iotdb.CountPointProcessor";
+ final Path pluginJarPath =
+ Paths.get(
+ System.getProperty("user.dir"),
+ "src",
+ "test",
+ "resources",
+ "pipe-count-point-processor-example.jar")
+ .toAbsolutePath();
+ System.out.println(pluginJarPath.toUri());
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipePlugin %s as '%s' USING URI '%s'",
+ pluginName, pluginClassName, pluginJarPath.toUri()));
+ }
+
+ senderEnv.shutdownAllDataNodes();
+ senderEnv.shutdownAllConfigNodes();
+
+ deletePluginJarUnderConfigNodes(pluginName);
+
+ senderEnv.startAllConfigNodes();
+ senderEnv.startAllDataNodes();
+ ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
+
+ boolean pluginFound = false;
+ boolean exceptionMessageFound = false;
+ SQLException lastException = null;
+ for (int retry = 0; retry < 10; retry++) {
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement();
+ final ResultSet resultSet = statement.executeQuery("show pipeplugins")) {
+ while (resultSet.next()) {
+ if (pluginName.equalsIgnoreCase(resultSet.getString("PluginName"))) {
+ pluginFound = true;
+ final String exceptionMessage = resultSet.getString("ExceptionMessage");
+ exceptionMessageFound = exceptionMessage != null && !exceptionMessage.trim().isEmpty();
+ break;
+ }
+ }
+ lastException = null;
+ break;
+ } catch (final SQLException e) {
+ lastException = e;
+ Thread.sleep(1000);
+ }
+ }
+ if (lastException != null) {
+ throw lastException;
+ }
+
+ Assert.assertTrue("Expected plugin in show pipe plugins result.", pluginFound);
+ Assert.assertTrue(
+ "Expected non-empty ExceptionMessage after deleting plugin jar and restarting cluster.",
+ exceptionMessageFound);
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(String.format("drop pipePlugin %s", pluginName));
+ }
+ }
+
+ private void deletePluginJarUnderConfigNodes(final String pluginName) throws IOException {
+ for (final ConfigNodeWrapper configNodeWrapper : senderEnv.getConfigNodeWrapperList()) {
+ final Path pluginJarDirPath =
+ Paths.get(
+ configNodeWrapper.getNodePath(), "ext", "pipe", "install", pluginName.toUpperCase());
+ if (!Files.exists(pluginJarDirPath)) {
+ continue;
+ }
+ try (final Stream<Path> children = Files.walk(pluginJarDirPath)) {
+ children
+ .filter(path -> !path.equals(pluginJarDirPath))
+ .sorted(Comparator.reverseOrder())
+ .forEach(
+ path -> {
+ try {
+ Files.deleteIfExists(path);
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ }
+ }
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
index 398830e..3ff36d0 100644
--- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
@@ -490,7 +490,8 @@
"plugin_name,STRING,TAG,",
"plugin_type,STRING,ATTRIBUTE,",
"class_name,STRING,ATTRIBUTE,",
- "plugin_jar,STRING,ATTRIBUTE,")));
+ "plugin_jar,STRING,ATTRIBUTE,",
+ "exception_message,STRING,ATTRIBUTE,")));
TestUtils.assertResultSetEqual(
statement.executeQuery("desc topics"),
"ColumnName,DataType,Category,",
@@ -708,9 +709,9 @@
TestUtils.assertResultSetEqual(
statement.executeQuery(
"select * from pipe_plugins where plugin_name = 'IOTDB-THRIFT-SINK'"),
- "plugin_name,plugin_type,class_name,plugin_jar,",
+ "plugin_name,plugin_type,class_name,plugin_jar,exception_message,",
Collections.singleton(
- "IOTDB-THRIFT-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink,null,"));
+ "IOTDB-THRIFT-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink,null,null,"));
TestUtils.assertResultSetEqual(
statement.executeQuery("select * from views"),
diff --git a/integration-test/src/test/resources/pipe-count-point-processor-example.jar b/integration-test/src/test/resources/pipe-count-point-processor-example.jar
new file mode 100644
index 0000000..5db7591
--- /dev/null
+++ b/integration-test/src/test/resources/pipe-count-point-processor-example.jar
Binary files differ
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java
index 534b824..93642f5 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java
@@ -51,7 +51,7 @@
public TGetPipePluginTableResp convertToThriftResponse() throws IOException {
final List<ByteBuffer> pipePluginInformationByteBuffers = new ArrayList<>();
for (PipePluginMeta pipePluginMeta : allPipePluginMeta) {
- pipePluginInformationByteBuffers.add(pipePluginMeta.serialize());
+ pipePluginInformationByteBuffers.add(pipePluginMeta.serializeForShowPipePlugin());
}
return new TGetPipePluginTableResp(status, pipePluginInformationByteBuffers);
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index c7c1387..30771e7 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -29,6 +29,7 @@
import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
@@ -111,6 +112,15 @@
final String pluginName, final boolean isSetIfNotExistsCondition) {
// both build-in and user defined pipe plugin should be unique
if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
+ final PipePluginMeta existedPipePluginMeta =
+ pipePluginMetaKeeper.getPipePluginMeta(pluginName);
+ final String loadingFailureMessage = existedPipePluginMeta.getPluginLoadingExceptionMessage();
+ if (loadingFailureMessage != null) {
+ throw new PipeException(
+ String.format(
+ "Failed to create PipePlugin [%s], this PipePlugin exists but failed to load: %s",
+ pluginName, loadingFailureMessage));
+ }
if (isSetIfNotExistsCondition) {
return true;
}
@@ -177,6 +187,7 @@
LOGGER.info(exceptionMessage);
throw new PipeException(exceptionMessage);
}
+ checkPipePluginAvailabilityForPipeCreation(sourcePluginName, "source");
final PipeParameters processorParameters = new PipeParameters(processorAttributes);
final String processorPluginName =
@@ -190,6 +201,7 @@
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
+ checkPipePluginAvailabilityForPipeCreation(processorPluginName, "processor");
final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
final String sinkPluginName =
@@ -204,13 +216,14 @@
LOGGER.warn(exceptionMessage);
throw new PipeException(exceptionMessage);
}
+ checkPipePluginAvailabilityForPipeCreation(sinkPluginName, "sink");
}
/////////////////////////////// Pipe Plugin Management ///////////////////////////////
public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan) {
+ final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
try {
- final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
final String pluginName = pipePluginMeta.getPluginName();
final String className = pipePluginMeta.getClassName();
final String jarName = pipePluginMeta.getJarName();
@@ -220,6 +233,22 @@
} else {
final String existed = pipePluginMetaKeeper.getPluginNameByJarName(jarName);
if (Objects.nonNull(existed)) {
+ final PipePluginMeta existedPipePluginMeta =
+ pipePluginMetaKeeper.getPipePluginMeta(existed);
+ final String existedLoadingFailureMessage =
+ existedPipePluginMeta.getPluginLoadingExceptionMessage();
+ if (existedLoadingFailureMessage != null) {
+ throw new PipeException(
+ String.format(
+ "Failed to create PipePlugin [%s], source PipePlugin [%s] failed to load: %s",
+ pluginName, existed, existedLoadingFailureMessage));
+ }
+ if (!pipePluginExecutableManager.hasPluginFileUnderInstallDir(existed, jarName)) {
+ throw new PipeException(
+ String.format(
+ "Failed to create PipePlugin [%s], source PipePlugin [%s] jar [%s] does not exist in install dir.",
+ pluginName, existed, jarName));
+ }
pipePluginExecutableManager.linkExistedPlugin(existed, pluginName, jarName);
computeFromPluginClass(pluginName, className);
} else {
@@ -237,7 +266,7 @@
pipePluginMetaKeeper.addJarNameAndMd5(jarName, pipePluginMeta.getJarMD5());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- } catch (final Exception e) {
+ } catch (final Throwable e) {
final String errorMessage =
String.format(
"Failed to execute createPipePlugin(%s) on config nodes, because of %s",
@@ -249,7 +278,7 @@
}
private void savePipePluginWithRollback(final CreatePipePluginPlan createPipePluginPlan)
- throws Exception {
+ throws Throwable {
final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
final String pluginName = pipePluginMeta.getPluginName();
final String className = pipePluginMeta.getClassName();
@@ -258,7 +287,7 @@
pipePluginExecutableManager.savePluginToInstallDir(
ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), pluginName, jarName);
computeFromPluginClass(pluginName, className);
- } catch (final Exception e) {
+ } catch (final Throwable e) {
// We need to rollback if the creation has failed
pipePluginExecutableManager.removePluginFileUnderLibRoot(pluginName, jarName);
throw e;
@@ -266,7 +295,7 @@
}
private void computeFromPluginClass(final String pluginName, final String className)
- throws Exception {
+ throws Throwable {
final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName);
final PipePluginClassLoader pipePluginClassLoader =
classLoaderManager.createPipePluginClassLoader(pluginDirPath);
@@ -275,7 +304,7 @@
pipePluginMetaKeeper.addPipePluginVisibility(
pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader);
- } catch (final Exception e) {
+ } catch (final Throwable e) {
try {
pipePluginClassLoader.close();
} catch (final Exception ignored) {
@@ -402,37 +431,84 @@
if (pipePluginMeta.isBuiltin()) {
continue;
}
- final String pluginName = pipePluginMeta.getPluginName();
- try {
- final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName);
- final PipePluginClassLoader pipePluginClassLoader =
- classLoaderManager.createPipePluginClassLoader(pluginDirPath);
- try {
- final Class<?> pluginClass =
- Class.forName(pipePluginMeta.getClassName(), true, pipePluginClassLoader);
- pipePluginMetaKeeper.addPipePluginVisibility(
- pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
- classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader);
- } catch (final Throwable e) {
- try {
- pipePluginClassLoader.close();
- } catch (final Exception ignored) {
- }
- throw e;
- }
- } catch (final Throwable e) {
- LOGGER.warn(
- "Failed to load plugin class for plugin [{}] when loading snapshot [{}] ",
- pluginName,
- snapshotFile.getAbsolutePath(),
- e);
- }
+ createPipePluginOnStartup(pipePluginMeta, snapshotFile);
}
} finally {
releasePipePluginInfoLock();
}
}
+ private String getRootCauseMessage(final Throwable throwable) {
+ Throwable current = throwable;
+ while (current.getCause() != null && current.getCause() != current) {
+ current = current.getCause();
+ }
+ final String message = current.getMessage();
+ return current.getClass().getSimpleName() + (message == null ? "" : (": " + message));
+ }
+
+ private void checkPipePluginAvailabilityForPipeCreation(
+ final String pluginName, final String pluginType) {
+ final PipePluginMeta pipePluginMeta = pipePluginMetaKeeper.getPipePluginMeta(pluginName);
+ final String loadingFailureMessage = pipePluginMeta.getPluginLoadingExceptionMessage();
+ if (loadingFailureMessage != null) {
+ final String exceptionMessage =
+ String.format(
+ "Failed to create or alter pipe, the pipe %s plugin %s failed to load: %s",
+ pluginType, pluginName, loadingFailureMessage);
+ LOGGER.warn(exceptionMessage);
+ throw new PipeException(exceptionMessage);
+ }
+ }
+
+ private void createPipePluginOnStartup(
+ final PipePluginMeta pipePluginMeta, final File snapshotFile) {
+ final String pluginName = pipePluginMeta.getPluginName();
+ try {
+ final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName);
+ final PipePluginClassLoader pipePluginClassLoader =
+ classLoaderManager.createPipePluginClassLoader(pluginDirPath);
+ try {
+ final Class<?> pluginClass =
+ Class.forName(pipePluginMeta.getClassName(), true, pipePluginClassLoader);
+ pipePluginMetaKeeper.addPipePluginVisibility(
+ pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
+ pipePluginMetaKeeper.addPipePluginMeta(
+ pluginName,
+ new PipePluginMeta(
+ pipePluginMeta.getPluginName(),
+ pipePluginMeta.getClassName(),
+ pipePluginMeta.isBuiltin(),
+ pipePluginMeta.getJarName(),
+ pipePluginMeta.getJarMD5(),
+ null));
+ classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader);
+ } catch (final Throwable e) {
+ try {
+ pipePluginClassLoader.close();
+ } catch (final Exception ignored) {
+ }
+ throw e;
+ }
+ } catch (final Throwable e) {
+ pipePluginMetaKeeper.addPipePluginMeta(
+ pluginName,
+ new PipePluginMeta(
+ pipePluginMeta.getPluginName(),
+ pipePluginMeta.getClassName(),
+ pipePluginMeta.isBuiltin(),
+ pipePluginMeta.getJarName(),
+ pipePluginMeta.getJarMD5(),
+ getRootCauseMessage(e)));
+ pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH);
+ LOGGER.warn(
+ "Failed to load plugin class for plugin [{}] when loading snapshot [{}] ",
+ pluginName,
+ snapshotFile.getAbsolutePath(),
+ e);
+ }
+ }
+
/////////////////////////////// hashCode & equals ///////////////////////////////
@Override
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index efbe1ee..ab48a247 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
import org.apache.iotdb.confignode.manager.pipe.coordinator.plugin.PipePluginCoordinator;
import org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator;
@@ -35,7 +36,6 @@
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -45,6 +45,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
@@ -162,8 +163,8 @@
private Flow executeFromDropOnDataNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("DropPipePluginProcedure: executeFromDropOnDataNodes({})", pluginName);
- if (RpcUtils.squashResponseStatusList(env.dropPipePluginOnDataNodes(pluginName, true)).getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ final List<TSStatus> dropStatusList = env.dropPipePluginOnDataNodes(pluginName, true);
+ if (dropStatusList.stream().allMatch(this::isDropPipePluginSuccessOrNotExists)) {
setNextState(DropPipePluginState.DROP_ON_CONFIG_NODES);
return Flow.HAS_MORE_STATE;
}
@@ -172,6 +173,18 @@
String.format("Failed to drop pipe plugin %s on data nodes", pluginName));
}
+ private boolean isDropPipePluginSuccessOrNotExists(final TSStatus status) {
+ if (status == null) {
+ return false;
+ }
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return true;
+ }
+ final String message = status.getMessage();
+ return message != null
+ && (message.contains("does not exist") || message.contains("not been created"));
+ }
+
private Flow executeFromDropOnConfigNodes(ConfigNodeProcedureEnv env) {
LOGGER.info("DropPipePluginProcedure: executeFromDropOnConfigNodes({})", pluginName);
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java
index 64551f7..146a6e1 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java
@@ -49,7 +49,7 @@
final List<ByteBuffer> pipePluginByteBuffers = new ArrayList<>();
for (PipePluginMeta pipePluginMeta : pipePluginMetaList) {
- pipePluginByteBuffers.add(pipePluginMeta.serialize());
+ pipePluginByteBuffers.add(pipePluginMeta.serializeForShowPipePlugin());
}
TGetPipePluginTableResp getPipePluginTableResp =
new TGetPipePluginTableResp(status, pipePluginByteBuffers);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
index d5795610..567893c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
@@ -24,6 +24,7 @@
import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoader;
import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoaderManager;
import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableManager;
+import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
import org.apache.iotdb.db.pipe.agent.plugin.dataregion.PipeDataRegionPluginAgent;
import org.apache.iotdb.db.pipe.agent.plugin.schemaregion.PipeSchemaRegionPluginAgent;
@@ -176,6 +177,30 @@
}
}
+ public void markPluginLoadFailure(
+ final PipePluginMeta pipePluginMeta, final Throwable throwable) {
+ final String pluginName = pipePluginMeta.getPluginName();
+ pipePluginMetaKeeper.addPipePluginMeta(
+ pluginName,
+ new PipePluginMeta(
+ pipePluginMeta.getPluginName(),
+ pipePluginMeta.getClassName(),
+ pipePluginMeta.isBuiltin(),
+ pipePluginMeta.getJarName(),
+ pipePluginMeta.getJarMD5(),
+ getRootCauseMessage(throwable)));
+ pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH);
+ }
+
+ private String getRootCauseMessage(final Throwable throwable) {
+ Throwable current = throwable;
+ while (current.getCause() != null && current.getCause() != current) {
+ current = current.getCause();
+ }
+ final String message = current.getMessage();
+ return current.getClass().getSimpleName() + (message == null ? "" : (": " + message));
+ }
+
public void deregister(final String pluginName, final boolean needToDeleteJar)
throws PipeException {
lock.lock();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 5a408a3..286c8a5 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -83,16 +83,20 @@
}
// create instances of pipe plugins and do registration
- try {
- for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
- if (meta.isBuiltin()) {
- continue;
- }
- PipeDataNodeAgent.plugin().doRegister(meta);
+ for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
+ if (meta.isBuiltin()) {
+ continue;
}
- } catch (Throwable e) {
- // Ignore the pipe plugin errors and continue to start
- LOGGER.warn("Failure when register pipe plugins, will ignore.", e);
+ try {
+ PipeDataNodeAgent.plugin().doRegister(meta);
+ } catch (Throwable e) {
+ PipeDataNodeAgent.plugin().markPluginLoadFailure(meta, e);
+ // Ignore a single broken plugin and continue startup.
+ LOGGER.warn(
+ "Failure when register pipe plugin {}. Skip this plugin and continue startup.",
+ meta.getPluginName(),
+ e);
+ }
}
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
index d7162b0..5b51c00 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
@@ -685,7 +685,7 @@
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
iterator =
client.getPipePluginTable().getAllPipePluginMeta().stream()
- .map(PipePluginMeta::deserialize)
+ .map(PipePluginMeta::deserializeForShowPipePlugin)
.filter(
pipePluginMeta ->
!BuiltinPipePlugin.SHOW_PIPE_PLUGINS_BLACKLIST.contains(
@@ -706,6 +706,12 @@
} else {
columnBuilders[3].appendNull();
}
+ if (Objects.nonNull(pipePluginMeta.getPluginLoadingExceptionMessage())) {
+ columnBuilders[4].writeBinary(
+ BytesUtils.valueOf(pipePluginMeta.getPluginLoadingExceptionMessage()));
+ } else {
+ columnBuilders[4].appendNull();
+ }
resultBuilder.declarePosition();
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java
index f186702..bffc9fe 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java
@@ -50,6 +50,7 @@
public static final Binary PIPE_PLUGIN_TYPE_EXTERNAL = BytesUtils.valueOf("External");
private static final Binary PIPE_JAR_NAME_EMPTY_FIELD = BytesUtils.valueOf("");
+ private static final Binary PIPE_PLUGIN_EXCEPTION_MESSAGE_EMPTY_FIELD = BytesUtils.valueOf("");
private final ShowPipePluginsStatement showPipePluginsStatement;
@@ -74,7 +75,8 @@
final List<PipePluginMeta> pipePluginMetaList = new ArrayList<>();
if (allPipePluginsInformation != null) {
for (final ByteBuffer pipePluginInformationByteBuffer : allPipePluginsInformation) {
- pipePluginMetaList.add(PipePluginMeta.deserialize(pipePluginInformationByteBuffer));
+ pipePluginMetaList.add(
+ PipePluginMeta.deserializeForShowPipePlugin(pipePluginInformationByteBuffer));
}
}
pipePluginMetaList.sort(Comparator.comparing(PipePluginMeta::getPluginName));
@@ -103,6 +105,12 @@
pipePluginMeta.getJarName() == null
? PIPE_JAR_NAME_EMPTY_FIELD
: BytesUtils.valueOf(pipePluginMeta.getJarName()));
+ builder
+ .getColumnBuilder(4)
+ .writeBinary(
+ pipePluginMeta.getPluginLoadingExceptionMessage() == null
+ ? PIPE_PLUGIN_EXCEPTION_MESSAGE_EMPTY_FIELD
+ : BytesUtils.valueOf(pipePluginMeta.getPluginLoadingExceptionMessage()));
builder.declarePosition();
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java
index cbb2534..6e87444 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java
@@ -81,6 +81,14 @@
LOGGER.warn(errorMessage);
throw new PipeException(errorMessage);
}
+ if (information.getPluginLoadingExceptionMessage() != null) {
+ final String errorMessage =
+ String.format(
+ "Failed to reflect PipePlugin instance, because PipePlugin %s failed to load: %s",
+ pluginName.toUpperCase(), information.getPluginLoadingExceptionMessage());
+ LOGGER.warn(errorMessage);
+ throw new PipeException(errorMessage);
+ }
try {
final Class<?> pluginClass =
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java
index 0fb2314..19f6863 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java
@@ -38,9 +38,20 @@
private final boolean isBuiltin;
private final String jarName;
private final String jarMD5;
+ private final String pluginLoadingExceptionMessage;
public PipePluginMeta(
String pluginName, String className, boolean isBuiltin, String jarName, String jarMD5) {
+ this(pluginName, className, isBuiltin, jarName, jarMD5, null);
+ }
+
+ public PipePluginMeta(
+ String pluginName,
+ String className,
+ boolean isBuiltin,
+ String jarName,
+ String jarMD5,
+ String pluginLoadingExceptionMessage) {
this.pluginName = Objects.requireNonNull(pluginName).toUpperCase();
this.className = Objects.requireNonNull(className);
@@ -52,6 +63,7 @@
this.jarName = Objects.requireNonNull(jarName);
this.jarMD5 = Objects.requireNonNull(jarMD5);
}
+ this.pluginLoadingExceptionMessage = pluginLoadingExceptionMessage;
}
public PipePluginMeta(String pluginName, String className) {
@@ -61,6 +73,7 @@
this.isBuiltin = true;
this.jarName = null;
this.jarMD5 = null;
+ this.pluginLoadingExceptionMessage = null;
}
public boolean isBuiltin() {
@@ -83,6 +96,10 @@
return jarMD5;
}
+ public String getPluginLoadingExceptionMessage() {
+ return pluginLoadingExceptionMessage;
+ }
+
public ByteBuffer serialize() throws IOException {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
@@ -90,6 +107,13 @@
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}
+ public ByteBuffer serializeForShowPipePlugin() throws IOException {
+ PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+ serializeForShowPipePlugin(outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+ }
+
public void serialize(DataOutputStream outputStream) throws IOException {
ReadWriteIOUtils.write(pluginName, outputStream);
ReadWriteIOUtils.write(className, outputStream);
@@ -98,13 +122,18 @@
ReadWriteIOUtils.write(jarMD5, outputStream);
}
+ public void serializeForShowPipePlugin(DataOutputStream outputStream) throws IOException {
+ serialize(outputStream);
+ ReadWriteIOUtils.write(pluginLoadingExceptionMessage, outputStream);
+ }
+
public static PipePluginMeta deserialize(ByteBuffer byteBuffer) {
final String pluginName = ReadWriteIOUtils.readString(byteBuffer);
final String className = ReadWriteIOUtils.readString(byteBuffer);
final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer);
final String jarName = ReadWriteIOUtils.readString(byteBuffer);
final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer);
- return new PipePluginMeta(pluginName, className, isBuiltin, jarName, jarMD5);
+ return new PipePluginMeta(pluginName, className, isBuiltin, jarName, jarMD5, null);
}
public static PipePluginMeta deserialize(InputStream inputStream) throws IOException {
@@ -112,6 +141,17 @@
ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
}
+ public static PipePluginMeta deserializeForShowPipePlugin(ByteBuffer byteBuffer) {
+ final String pluginName = ReadWriteIOUtils.readString(byteBuffer);
+ final String className = ReadWriteIOUtils.readString(byteBuffer);
+ final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer);
+ final String jarName = ReadWriteIOUtils.readString(byteBuffer);
+ final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer);
+ final String pluginLoadingExceptionMessage = ReadWriteIOUtils.readString(byteBuffer);
+ return new PipePluginMeta(
+ pluginName, className, isBuiltin, jarName, jarMD5, pluginLoadingExceptionMessage);
+ }
+
@Override
public boolean equals(Object obj) {
if (this == obj) {
@@ -150,6 +190,9 @@
+ ", jarMD5='"
+ jarMD5
+ '\''
+ + ", pluginLoadingExceptionMessage='"
+ + pluginLoadingExceptionMessage
+ + '\''
+ '}';
}
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
index f436165..186f7da 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
@@ -579,7 +579,8 @@
new ColumnHeader(PLUGIN_NAME, TSDataType.TEXT),
new ColumnHeader(PLUGIN_TYPE, TSDataType.TEXT),
new ColumnHeader(CLASS_NAME, TSDataType.TEXT),
- new ColumnHeader(PLUGIN_JAR, TSDataType.TEXT));
+ new ColumnHeader(PLUGIN_JAR, TSDataType.TEXT),
+ new ColumnHeader(EXCEPTION_MESSAGE, TSDataType.TEXT));
public static final List<ColumnHeader> showSchemaTemplateHeaders =
ImmutableList.of(new ColumnHeader(TEMPLATE_NAME, TSDataType.TEXT));
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
index 98bf4a9..75b71ef 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
@@ -230,6 +230,9 @@
new AttributeColumnSchema(ColumnHeaderConstant.CLASS_NAME_TABLE_MODEL, TSDataType.STRING));
pipePluginTable.addColumnSchema(
new AttributeColumnSchema(ColumnHeaderConstant.PLUGIN_JAR_TABLE_MODEL, TSDataType.STRING));
+ pipePluginTable.addColumnSchema(
+ new AttributeColumnSchema(
+ ColumnHeaderConstant.EXCEPTION_MESSAGE_TABLE_MODEL, TSDataType.STRING));
schemaTables.put(PIPE_PLUGINS, pipePluginTable);
final TsTable topicTable = new TsTable(TOPICS);