Show pipe plugin loading errors in plugin listing (#17487)

* Show pipe plugin loading errors in plugin listing

Add plugin loading exception messages to pipe plugin metadata and expose them through SHOW PIPEPLUGINS and information_schema.pipe_plugins, so users can diagnose initialization and class loading failures directly from query results.

Made-with: Cursor

* spotless

* spotless

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* spotless

* spotless

* update
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
index 87bc7b0..0c2f3bc 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSyntaxIT.java
@@ -25,6 +25,8 @@
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
+import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic;
@@ -39,14 +41,21 @@
 import org.junit.runner.RunWith;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static org.junit.Assert.fail;
 
@@ -884,4 +893,96 @@
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void testShowPipePluginAfterJarDeletedAndClusterRestart() throws Exception {
+    final String pluginName = "TEST_MISSING_JAR_PROCESSOR";
+    final String pluginClassName = "org.apache.iotdb.CountPointProcessor";
+    final Path pluginJarPath =
+        Paths.get(
+                System.getProperty("user.dir"),
+                "src",
+                "test",
+                "resources",
+                "pipe-count-point-processor-example.jar")
+            .toAbsolutePath();
+    System.out.println(pluginJarPath.toUri());
+
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipePlugin %s as '%s' USING URI '%s'",
+              pluginName, pluginClassName, pluginJarPath.toUri()));
+    }
+
+    senderEnv.shutdownAllDataNodes();
+    senderEnv.shutdownAllConfigNodes();
+
+    deletePluginJarUnderConfigNodes(pluginName);
+
+    senderEnv.startAllConfigNodes();
+    senderEnv.startAllDataNodes();
+    ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
+
+    boolean pluginFound = false;
+    boolean exceptionMessageFound = false;
+    SQLException lastException = null;
+    for (int retry = 0; retry < 10; retry++) {
+      try (final Connection connection = senderEnv.getConnection();
+          final Statement statement = connection.createStatement();
+          final ResultSet resultSet = statement.executeQuery("show pipeplugins")) {
+        while (resultSet.next()) {
+          if (pluginName.equalsIgnoreCase(resultSet.getString("PluginName"))) {
+            pluginFound = true;
+            final String exceptionMessage = resultSet.getString("ExceptionMessage");
+            exceptionMessageFound = exceptionMessage != null && !exceptionMessage.trim().isEmpty();
+            break;
+          }
+        }
+        lastException = null;
+        break;
+      } catch (final SQLException e) {
+        lastException = e;
+        Thread.sleep(1000);
+      }
+    }
+    if (lastException != null) {
+      throw lastException;
+    }
+
+    Assert.assertTrue("Expected plugin in show pipe plugins result.", pluginFound);
+    Assert.assertTrue(
+        "Expected non-empty ExceptionMessage after deleting plugin jar and restarting cluster.",
+        exceptionMessageFound);
+
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(String.format("drop pipePlugin %s", pluginName));
+    }
+  }
+
+  private void deletePluginJarUnderConfigNodes(final String pluginName) throws IOException {
+    for (final ConfigNodeWrapper configNodeWrapper : senderEnv.getConfigNodeWrapperList()) {
+      final Path pluginJarDirPath =
+          Paths.get(
+              configNodeWrapper.getNodePath(), "ext", "pipe", "install", pluginName.toUpperCase());
+      if (!Files.exists(pluginJarDirPath)) {
+        continue;
+      }
+      try (final Stream<Path> children = Files.walk(pluginJarDirPath)) {
+        children
+            .filter(path -> !path.equals(pluginJarDirPath))
+            .sorted(Comparator.reverseOrder())
+            .forEach(
+                path -> {
+                  try {
+                    Files.deleteIfExists(path);
+                  } catch (final IOException e) {
+                    throw new RuntimeException(e);
+                  }
+                });
+      }
+    }
+  }
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
index 398830e..3ff36d0 100644
--- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
@@ -490,7 +490,8 @@
                   "plugin_name,STRING,TAG,",
                   "plugin_type,STRING,ATTRIBUTE,",
                   "class_name,STRING,ATTRIBUTE,",
-                  "plugin_jar,STRING,ATTRIBUTE,")));
+                  "plugin_jar,STRING,ATTRIBUTE,",
+                  "exception_message,STRING,ATTRIBUTE,")));
       TestUtils.assertResultSetEqual(
           statement.executeQuery("desc topics"),
           "ColumnName,DataType,Category,",
@@ -708,9 +709,9 @@
       TestUtils.assertResultSetEqual(
           statement.executeQuery(
               "select * from pipe_plugins where plugin_name = 'IOTDB-THRIFT-SINK'"),
-          "plugin_name,plugin_type,class_name,plugin_jar,",
+          "plugin_name,plugin_type,class_name,plugin_jar,exception_message,",
           Collections.singleton(
-              "IOTDB-THRIFT-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink,null,"));
+              "IOTDB-THRIFT-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.sink.iotdb.thrift.IoTDBThriftSink,null,null,"));
 
       TestUtils.assertResultSetEqual(
           statement.executeQuery("select * from views"),
diff --git a/integration-test/src/test/resources/pipe-count-point-processor-example.jar b/integration-test/src/test/resources/pipe-count-point-processor-example.jar
new file mode 100644
index 0000000..5db7591
--- /dev/null
+++ b/integration-test/src/test/resources/pipe-count-point-processor-example.jar
Binary files differ
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java
index 534b824..93642f5 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/plugin/PipePluginTableResp.java
@@ -51,7 +51,7 @@
   public TGetPipePluginTableResp convertToThriftResponse() throws IOException {
     final List<ByteBuffer> pipePluginInformationByteBuffers = new ArrayList<>();
     for (PipePluginMeta pipePluginMeta : allPipePluginMeta) {
-      pipePluginInformationByteBuffers.add(pipePluginMeta.serialize());
+      pipePluginInformationByteBuffers.add(pipePluginMeta.serializeForShowPipePlugin());
     }
     return new TGetPipePluginTableResp(status, pipePluginInformationByteBuffers);
   }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index c7c1387..30771e7 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -29,6 +29,7 @@
 import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
 import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
+import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
 import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
@@ -111,6 +112,15 @@
       final String pluginName, final boolean isSetIfNotExistsCondition) {
     // both build-in and user defined pipe plugin should be unique
     if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
+      final PipePluginMeta existedPipePluginMeta =
+          pipePluginMetaKeeper.getPipePluginMeta(pluginName);
+      final String loadingFailureMessage = existedPipePluginMeta.getPluginLoadingExceptionMessage();
+      if (loadingFailureMessage != null) {
+        throw new PipeException(
+            String.format(
+                "Failed to create PipePlugin [%s], this PipePlugin exists but failed to load: %s",
+                pluginName, loadingFailureMessage));
+      }
       if (isSetIfNotExistsCondition) {
         return true;
       }
@@ -177,6 +187,7 @@
       LOGGER.info(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
+    checkPipePluginAvailabilityForPipeCreation(sourcePluginName, "source");
 
     final PipeParameters processorParameters = new PipeParameters(processorAttributes);
     final String processorPluginName =
@@ -190,6 +201,7 @@
       LOGGER.warn(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
+    checkPipePluginAvailabilityForPipeCreation(processorPluginName, "processor");
 
     final PipeParameters sinkParameters = new PipeParameters(sinkAttributes);
     final String sinkPluginName =
@@ -204,13 +216,14 @@
       LOGGER.warn(exceptionMessage);
       throw new PipeException(exceptionMessage);
     }
+    checkPipePluginAvailabilityForPipeCreation(sinkPluginName, "sink");
   }
 
   /////////////////////////////// Pipe Plugin Management ///////////////////////////////
 
   public TSStatus createPipePlugin(final CreatePipePluginPlan createPipePluginPlan) {
+    final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
     try {
-      final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
       final String pluginName = pipePluginMeta.getPluginName();
       final String className = pipePluginMeta.getClassName();
       final String jarName = pipePluginMeta.getJarName();
@@ -220,6 +233,22 @@
       } else {
         final String existed = pipePluginMetaKeeper.getPluginNameByJarName(jarName);
         if (Objects.nonNull(existed)) {
+          final PipePluginMeta existedPipePluginMeta =
+              pipePluginMetaKeeper.getPipePluginMeta(existed);
+          final String existedLoadingFailureMessage =
+              existedPipePluginMeta.getPluginLoadingExceptionMessage();
+          if (existedLoadingFailureMessage != null) {
+            throw new PipeException(
+                String.format(
+                    "Failed to create PipePlugin [%s], source PipePlugin [%s] failed to load: %s",
+                    pluginName, existed, existedLoadingFailureMessage));
+          }
+          if (!pipePluginExecutableManager.hasPluginFileUnderInstallDir(existed, jarName)) {
+            throw new PipeException(
+                String.format(
+                    "Failed to create PipePlugin [%s], source PipePlugin [%s] jar [%s] does not exist in install dir.",
+                    pluginName, existed, jarName));
+          }
           pipePluginExecutableManager.linkExistedPlugin(existed, pluginName, jarName);
           computeFromPluginClass(pluginName, className);
         } else {
@@ -237,7 +266,7 @@
       pipePluginMetaKeeper.addJarNameAndMd5(jarName, pipePluginMeta.getJarMD5());
 
       return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
-    } catch (final Exception e) {
+    } catch (final Throwable e) {
       final String errorMessage =
           String.format(
               "Failed to execute createPipePlugin(%s) on config nodes, because of %s",
@@ -249,7 +278,7 @@
   }
 
   private void savePipePluginWithRollback(final CreatePipePluginPlan createPipePluginPlan)
-      throws Exception {
+      throws Throwable {
     final PipePluginMeta pipePluginMeta = createPipePluginPlan.getPipePluginMeta();
     final String pluginName = pipePluginMeta.getPluginName();
     final String className = pipePluginMeta.getClassName();
@@ -258,7 +287,7 @@
       pipePluginExecutableManager.savePluginToInstallDir(
           ByteBuffer.wrap(createPipePluginPlan.getJarFile().getValues()), pluginName, jarName);
       computeFromPluginClass(pluginName, className);
-    } catch (final Exception e) {
+    } catch (final Throwable e) {
       // We need to rollback if the creation has failed
       pipePluginExecutableManager.removePluginFileUnderLibRoot(pluginName, jarName);
       throw e;
@@ -266,7 +295,7 @@
   }
 
   private void computeFromPluginClass(final String pluginName, final String className)
-      throws Exception {
+      throws Throwable {
     final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName);
     final PipePluginClassLoader pipePluginClassLoader =
         classLoaderManager.createPipePluginClassLoader(pluginDirPath);
@@ -275,7 +304,7 @@
       pipePluginMetaKeeper.addPipePluginVisibility(
           pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
       classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader);
-    } catch (final Exception e) {
+    } catch (final Throwable e) {
       try {
         pipePluginClassLoader.close();
       } catch (final Exception ignored) {
@@ -402,37 +431,84 @@
         if (pipePluginMeta.isBuiltin()) {
           continue;
         }
-        final String pluginName = pipePluginMeta.getPluginName();
-        try {
-          final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName);
-          final PipePluginClassLoader pipePluginClassLoader =
-              classLoaderManager.createPipePluginClassLoader(pluginDirPath);
-          try {
-            final Class<?> pluginClass =
-                Class.forName(pipePluginMeta.getClassName(), true, pipePluginClassLoader);
-            pipePluginMetaKeeper.addPipePluginVisibility(
-                pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
-            classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader);
-          } catch (final Throwable e) {
-            try {
-              pipePluginClassLoader.close();
-            } catch (final Exception ignored) {
-            }
-            throw e;
-          }
-        } catch (final Throwable e) {
-          LOGGER.warn(
-              "Failed to load plugin class for plugin [{}] when loading snapshot [{}] ",
-              pluginName,
-              snapshotFile.getAbsolutePath(),
-              e);
-        }
+        createPipePluginOnStartup(pipePluginMeta, snapshotFile);
       }
     } finally {
       releasePipePluginInfoLock();
     }
   }
 
+  private String getRootCauseMessage(final Throwable throwable) {
+    Throwable current = throwable;
+    while (current.getCause() != null && current.getCause() != current) {
+      current = current.getCause();
+    }
+    final String message = current.getMessage();
+    return current.getClass().getSimpleName() + (message == null ? "" : (": " + message));
+  }
+
+  private void checkPipePluginAvailabilityForPipeCreation(
+      final String pluginName, final String pluginType) {
+    final PipePluginMeta pipePluginMeta = pipePluginMetaKeeper.getPipePluginMeta(pluginName);
+    final String loadingFailureMessage = pipePluginMeta.getPluginLoadingExceptionMessage();
+    if (loadingFailureMessage != null) {
+      final String exceptionMessage =
+          String.format(
+              "Failed to create or alter pipe, the pipe %s plugin %s failed to load: %s",
+              pluginType, pluginName, loadingFailureMessage);
+      LOGGER.warn(exceptionMessage);
+      throw new PipeException(exceptionMessage);
+    }
+  }
+
+  private void createPipePluginOnStartup(
+      final PipePluginMeta pipePluginMeta, final File snapshotFile) {
+    final String pluginName = pipePluginMeta.getPluginName();
+    try {
+      final String pluginDirPath = pipePluginExecutableManager.getPluginsDirPath(pluginName);
+      final PipePluginClassLoader pipePluginClassLoader =
+          classLoaderManager.createPipePluginClassLoader(pluginDirPath);
+      try {
+        final Class<?> pluginClass =
+            Class.forName(pipePluginMeta.getClassName(), true, pipePluginClassLoader);
+        pipePluginMetaKeeper.addPipePluginVisibility(
+            pluginName, VisibilityUtils.calculateFromPluginClass(pluginClass));
+        pipePluginMetaKeeper.addPipePluginMeta(
+            pluginName,
+            new PipePluginMeta(
+                pipePluginMeta.getPluginName(),
+                pipePluginMeta.getClassName(),
+                pipePluginMeta.isBuiltin(),
+                pipePluginMeta.getJarName(),
+                pipePluginMeta.getJarMD5(),
+                null));
+        classLoaderManager.addPluginAndClassLoader(pluginName, pipePluginClassLoader);
+      } catch (final Throwable e) {
+        try {
+          pipePluginClassLoader.close();
+        } catch (final Exception ignored) {
+        }
+        throw e;
+      }
+    } catch (final Throwable e) {
+      pipePluginMetaKeeper.addPipePluginMeta(
+          pluginName,
+          new PipePluginMeta(
+              pipePluginMeta.getPluginName(),
+              pipePluginMeta.getClassName(),
+              pipePluginMeta.isBuiltin(),
+              pipePluginMeta.getJarName(),
+              pipePluginMeta.getJarMD5(),
+              getRootCauseMessage(e)));
+      pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH);
+      LOGGER.warn(
+          "Failed to load plugin class for plugin [{}] when loading snapshot [{}] ",
+          pluginName,
+          snapshotFile.getAbsolutePath(),
+          e);
+    }
+  }
+
   /////////////////////////////// hashCode & equals ///////////////////////////////
 
   @Override
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index efbe1ee..ab48a247 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.confignode.procedure.impl.pipe.plugin;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan;
 import org.apache.iotdb.confignode.manager.pipe.coordinator.plugin.PipePluginCoordinator;
 import org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator;
@@ -35,7 +36,6 @@
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -45,6 +45,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -162,8 +163,8 @@
   private Flow executeFromDropOnDataNodes(ConfigNodeProcedureEnv env) {
     LOGGER.info("DropPipePluginProcedure: executeFromDropOnDataNodes({})", pluginName);
 
-    if (RpcUtils.squashResponseStatusList(env.dropPipePluginOnDataNodes(pluginName, true)).getCode()
-        == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+    final List<TSStatus> dropStatusList = env.dropPipePluginOnDataNodes(pluginName, true);
+    if (dropStatusList.stream().allMatch(this::isDropPipePluginSuccessOrNotExists)) {
       setNextState(DropPipePluginState.DROP_ON_CONFIG_NODES);
       return Flow.HAS_MORE_STATE;
     }
@@ -172,6 +173,18 @@
         String.format("Failed to drop pipe plugin %s on data nodes", pluginName));
   }
 
+  private boolean isDropPipePluginSuccessOrNotExists(final TSStatus status) {
+    if (status == null) {
+      return false;
+    }
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return true;
+    }
+    final String message = status.getMessage();
+    return message != null
+        && (message.contains("does not exist") || message.contains("not been created"));
+  }
+
   private Flow executeFromDropOnConfigNodes(ConfigNodeProcedureEnv env) {
     LOGGER.info("DropPipePluginProcedure: executeFromDropOnConfigNodes({})", pluginName);
 
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java
index 64551f7..146a6e1 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipePluginTableRespTest.java
@@ -49,7 +49,7 @@
 
     final List<ByteBuffer> pipePluginByteBuffers = new ArrayList<>();
     for (PipePluginMeta pipePluginMeta : pipePluginMetaList) {
-      pipePluginByteBuffers.add(pipePluginMeta.serialize());
+      pipePluginByteBuffers.add(pipePluginMeta.serializeForShowPipePlugin());
     }
     TGetPipePluginTableResp getPipePluginTableResp =
         new TGetPipePluginTableResp(status, pipePluginByteBuffers);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
index d5795610..567893c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgent.java
@@ -24,6 +24,7 @@
 import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoader;
 import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginClassLoaderManager;
 import org.apache.iotdb.commons.pipe.agent.plugin.service.PipePluginExecutableManager;
+import org.apache.iotdb.commons.pipe.datastructure.visibility.Visibility;
 import org.apache.iotdb.commons.pipe.datastructure.visibility.VisibilityUtils;
 import org.apache.iotdb.db.pipe.agent.plugin.dataregion.PipeDataRegionPluginAgent;
 import org.apache.iotdb.db.pipe.agent.plugin.schemaregion.PipeSchemaRegionPluginAgent;
@@ -176,6 +177,30 @@
     }
   }
 
+  public void markPluginLoadFailure(
+      final PipePluginMeta pipePluginMeta, final Throwable throwable) {
+    final String pluginName = pipePluginMeta.getPluginName();
+    pipePluginMetaKeeper.addPipePluginMeta(
+        pluginName,
+        new PipePluginMeta(
+            pipePluginMeta.getPluginName(),
+            pipePluginMeta.getClassName(),
+            pipePluginMeta.isBuiltin(),
+            pipePluginMeta.getJarName(),
+            pipePluginMeta.getJarMD5(),
+            getRootCauseMessage(throwable)));
+    pipePluginMetaKeeper.addPipePluginVisibility(pluginName, Visibility.BOTH);
+  }
+
+  private String getRootCauseMessage(final Throwable throwable) {
+    Throwable current = throwable;
+    while (current.getCause() != null && current.getCause() != current) {
+      current = current.getCause();
+    }
+    final String message = current.getMessage();
+    return current.getClass().getSimpleName() + (message == null ? "" : (": " + message));
+  }
+
   public void deregister(final String pluginName, final boolean needToDeleteJar)
       throws PipeException {
     lock.lock();
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 5a408a3..286c8a5 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -83,16 +83,20 @@
     }
 
     // create instances of pipe plugins and do registration
-    try {
-      for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
-        if (meta.isBuiltin()) {
-          continue;
-        }
-        PipeDataNodeAgent.plugin().doRegister(meta);
+    for (PipePluginMeta meta : resourcesInformationHolder.getPipePluginMetaList()) {
+      if (meta.isBuiltin()) {
+        continue;
       }
-    } catch (Throwable e) {
-      // Ignore the pipe plugin errors and continue to start
-      LOGGER.warn("Failure when register pipe plugins, will ignore.", e);
+      try {
+        PipeDataNodeAgent.plugin().doRegister(meta);
+      } catch (Throwable e) {
+        PipeDataNodeAgent.plugin().markPluginLoadFailure(meta, e);
+        // Ignore a single broken plugin and continue startup.
+        LOGGER.warn(
+            "Failure when register pipe plugin {}. Skip this plugin and continue startup.",
+            meta.getPluginName(),
+            e);
+      }
     }
   }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
index d7162b0..5b51c00 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
@@ -685,7 +685,7 @@
           ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
         iterator =
             client.getPipePluginTable().getAllPipePluginMeta().stream()
-                .map(PipePluginMeta::deserialize)
+                .map(PipePluginMeta::deserializeForShowPipePlugin)
                 .filter(
                     pipePluginMeta ->
                         !BuiltinPipePlugin.SHOW_PIPE_PLUGINS_BLACKLIST.contains(
@@ -706,6 +706,12 @@
       } else {
         columnBuilders[3].appendNull();
       }
+      if (Objects.nonNull(pipePluginMeta.getPluginLoadingExceptionMessage())) {
+        columnBuilders[4].writeBinary(
+            BytesUtils.valueOf(pipePluginMeta.getPluginLoadingExceptionMessage()));
+      } else {
+        columnBuilders[4].appendNull();
+      }
       resultBuilder.declarePosition();
     }
 
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java
index f186702..bffc9fe 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java
@@ -50,6 +50,7 @@
   public static final Binary PIPE_PLUGIN_TYPE_EXTERNAL = BytesUtils.valueOf("External");
 
   private static final Binary PIPE_JAR_NAME_EMPTY_FIELD = BytesUtils.valueOf("");
+  private static final Binary PIPE_PLUGIN_EXCEPTION_MESSAGE_EMPTY_FIELD = BytesUtils.valueOf("");
 
   private final ShowPipePluginsStatement showPipePluginsStatement;
 
@@ -74,7 +75,8 @@
     final List<PipePluginMeta> pipePluginMetaList = new ArrayList<>();
     if (allPipePluginsInformation != null) {
       for (final ByteBuffer pipePluginInformationByteBuffer : allPipePluginsInformation) {
-        pipePluginMetaList.add(PipePluginMeta.deserialize(pipePluginInformationByteBuffer));
+        pipePluginMetaList.add(
+            PipePluginMeta.deserializeForShowPipePlugin(pipePluginInformationByteBuffer));
       }
     }
     pipePluginMetaList.sort(Comparator.comparing(PipePluginMeta::getPluginName));
@@ -103,6 +105,12 @@
               pipePluginMeta.getJarName() == null
                   ? PIPE_JAR_NAME_EMPTY_FIELD
                   : BytesUtils.valueOf(pipePluginMeta.getJarName()));
+      builder
+          .getColumnBuilder(4)
+          .writeBinary(
+              pipePluginMeta.getPluginLoadingExceptionMessage() == null
+                  ? PIPE_PLUGIN_EXCEPTION_MESSAGE_EMPTY_FIELD
+                  : BytesUtils.valueOf(pipePluginMeta.getPluginLoadingExceptionMessage()));
       builder.declarePosition();
     }
 
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java
index cbb2534..6e87444 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/constructor/PipePluginConstructor.java
@@ -81,6 +81,14 @@
       LOGGER.warn(errorMessage);
       throw new PipeException(errorMessage);
     }
+    if (information.getPluginLoadingExceptionMessage() != null) {
+      final String errorMessage =
+          String.format(
+              "Failed to reflect PipePlugin instance, because PipePlugin %s failed to load: %s",
+              pluginName.toUpperCase(), information.getPluginLoadingExceptionMessage());
+      LOGGER.warn(errorMessage);
+      throw new PipeException(errorMessage);
+    }
 
     try {
       final Class<?> pluginClass =
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java
index 0fb2314..19f6863 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/meta/PipePluginMeta.java
@@ -38,9 +38,20 @@
   private final boolean isBuiltin;
   private final String jarName;
   private final String jarMD5;
+  private final String pluginLoadingExceptionMessage;
 
   public PipePluginMeta(
       String pluginName, String className, boolean isBuiltin, String jarName, String jarMD5) {
+    this(pluginName, className, isBuiltin, jarName, jarMD5, null);
+  }
+
+  public PipePluginMeta(
+      String pluginName,
+      String className,
+      boolean isBuiltin,
+      String jarName,
+      String jarMD5,
+      String pluginLoadingExceptionMessage) {
     this.pluginName = Objects.requireNonNull(pluginName).toUpperCase();
     this.className = Objects.requireNonNull(className);
 
@@ -52,6 +63,7 @@
       this.jarName = Objects.requireNonNull(jarName);
       this.jarMD5 = Objects.requireNonNull(jarMD5);
     }
+    this.pluginLoadingExceptionMessage = pluginLoadingExceptionMessage;
   }
 
   public PipePluginMeta(String pluginName, String className) {
@@ -61,6 +73,7 @@
     this.isBuiltin = true;
     this.jarName = null;
     this.jarMD5 = null;
+    this.pluginLoadingExceptionMessage = null;
   }
 
   public boolean isBuiltin() {
@@ -83,6 +96,10 @@
     return jarMD5;
   }
 
+  public String getPluginLoadingExceptionMessage() {
+    return pluginLoadingExceptionMessage;
+  }
+
   public ByteBuffer serialize() throws IOException {
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
@@ -90,6 +107,13 @@
     return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
   }
 
+  public ByteBuffer serializeForShowPipePlugin() throws IOException {
+    PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream);
+    serializeForShowPipePlugin(outputStream);
+    return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
+  }
+
   public void serialize(DataOutputStream outputStream) throws IOException {
     ReadWriteIOUtils.write(pluginName, outputStream);
     ReadWriteIOUtils.write(className, outputStream);
@@ -98,13 +122,18 @@
     ReadWriteIOUtils.write(jarMD5, outputStream);
   }
 
+  public void serializeForShowPipePlugin(DataOutputStream outputStream) throws IOException {
+    serialize(outputStream);
+    ReadWriteIOUtils.write(pluginLoadingExceptionMessage, outputStream);
+  }
+
   public static PipePluginMeta deserialize(ByteBuffer byteBuffer) {
     final String pluginName = ReadWriteIOUtils.readString(byteBuffer);
     final String className = ReadWriteIOUtils.readString(byteBuffer);
     final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer);
     final String jarName = ReadWriteIOUtils.readString(byteBuffer);
     final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer);
-    return new PipePluginMeta(pluginName, className, isBuiltin, jarName, jarMD5);
+    return new PipePluginMeta(pluginName, className, isBuiltin, jarName, jarMD5, null);
   }
 
   public static PipePluginMeta deserialize(InputStream inputStream) throws IOException {
@@ -112,6 +141,17 @@
         ByteBuffer.wrap(ReadWriteIOUtils.readBytesWithSelfDescriptionLength(inputStream)));
   }
 
+  public static PipePluginMeta deserializeForShowPipePlugin(ByteBuffer byteBuffer) {
+    final String pluginName = ReadWriteIOUtils.readString(byteBuffer);
+    final String className = ReadWriteIOUtils.readString(byteBuffer);
+    final boolean isBuiltin = ReadWriteIOUtils.readBool(byteBuffer);
+    final String jarName = ReadWriteIOUtils.readString(byteBuffer);
+    final String jarMD5 = ReadWriteIOUtils.readString(byteBuffer);
+    final String pluginLoadingExceptionMessage = ReadWriteIOUtils.readString(byteBuffer);
+    return new PipePluginMeta(
+        pluginName, className, isBuiltin, jarName, jarMD5, pluginLoadingExceptionMessage);
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (this == obj) {
@@ -150,6 +190,9 @@
         + ", jarMD5='"
         + jarMD5
         + '\''
+        + ", pluginLoadingExceptionMessage='"
+        + pluginLoadingExceptionMessage
+        + '\''
         + '}';
   }
 }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
index f436165..186f7da 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
@@ -579,7 +579,8 @@
           new ColumnHeader(PLUGIN_NAME, TSDataType.TEXT),
           new ColumnHeader(PLUGIN_TYPE, TSDataType.TEXT),
           new ColumnHeader(CLASS_NAME, TSDataType.TEXT),
-          new ColumnHeader(PLUGIN_JAR, TSDataType.TEXT));
+          new ColumnHeader(PLUGIN_JAR, TSDataType.TEXT),
+          new ColumnHeader(EXCEPTION_MESSAGE, TSDataType.TEXT));
 
   public static final List<ColumnHeader> showSchemaTemplateHeaders =
       ImmutableList.of(new ColumnHeader(TEMPLATE_NAME, TSDataType.TEXT));
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
index 98bf4a9..75b71ef 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
@@ -230,6 +230,9 @@
         new AttributeColumnSchema(ColumnHeaderConstant.CLASS_NAME_TABLE_MODEL, TSDataType.STRING));
     pipePluginTable.addColumnSchema(
         new AttributeColumnSchema(ColumnHeaderConstant.PLUGIN_JAR_TABLE_MODEL, TSDataType.STRING));
+    pipePluginTable.addColumnSchema(
+        new AttributeColumnSchema(
+            ColumnHeaderConstant.EXCEPTION_MESSAGE_TABLE_MODEL, TSDataType.STRING));
     schemaTables.put(PIPE_PLUGINS, pipePluginTable);
 
     final TsTable topicTable = new TsTable(TOPICS);