Pipe: avoid useless pipe meta sync (stopped status) to DN to achieve high availability (#11641)

After #11279 https://github.com/apache/iotdb/pull/11279, when a pipe encounters a critical exception on DN, it stops all DR's pipe tasks related to that pipe and modifies the status of the relevant pipe to exception stopped on this DN. Subsequently, the CN collects this information through the heartbeat mechanism and pushes the changes to all DNs. This causes other DNs with running normal pipe tasks to be auto-restarted unnecessarily. This PR addresses this issue.

From now on, there will be a subtle change in the semantics of the CN-side pipe status. When the status is exception stopped, it indicates that the pipe tasks on some (not necessarily all) DRs (DNs) of this pipe encountered a critical exception.

----------

* chore: avoid unused pipe meta sync to DN

* fix: remove PipeHandleMetaChangeProcedure in start pipe

* chore: some logic optimizations

* chore: add comments for stopping pipe
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
index 2d7cd66..ce8eb84 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
@@ -224,7 +224,7 @@
               runtimeMeta.setIsStoppedByRuntimeException(true);
 
               needWriteConsensusOnConfigNodes.set(true);
-              needPushPipeMetaToDataNodes.set(true);
+              needPushPipeMetaToDataNodes.set(false);
 
               LOGGER.warn(
                   "Detect PipeRuntimeCriticalException {} from DataNode, stop pipe {}.",
@@ -253,7 +253,7 @@
                             runtimeMeta.setIsStoppedByRuntimeException(true);
 
                             needWriteConsensusOnConfigNodes.set(true);
-                            needPushPipeMetaToDataNodes.set(true);
+                            needPushPipeMetaToDataNodes.set(false);
 
                             LOGGER.warn(
                                 String.format(
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
index af5da32..018c234 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
@@ -122,13 +122,14 @@
     if (metaSyncStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       boolean successfulSync = !somePipesNeedRestarting;
 
-      if (somePipesNeedRestarting) {
-        final boolean isRestartSuccessful = handleSuccessfulRestartWithLock();
+      if (somePipesNeedRestarting && handleSuccessfulRestartWithLock()) {
         final TSStatus handleMetaChangeStatus =
             procedureManager.pipeHandleMetaChangeWithBlock(true, false);
-        if (isRestartSuccessful
-            && handleMetaChangeStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        if (handleMetaChangeStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           successfulSync = true;
+        } else {
+          LOGGER.warn(
+              "Failed to handle pipe meta change. Result status: {}.", handleMetaChangeStatus);
         }
       }
 
@@ -139,12 +140,6 @@
       }
     } else {
       LOGGER.warn("Failed to sync pipe meta. Result status: {}.", metaSyncStatus);
-      final TSStatus handleMetaChangeStatus =
-          procedureManager.pipeHandleMetaChangeWithBlock(true, true);
-      if (handleMetaChangeStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.warn(
-            "Failed to handle pipe meta change. Result status: {}.", handleMetaChangeStatus);
-      }
     }
   }
 
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
index 6f4d03a..f677fec 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/task/PipeTaskCoordinator.java
@@ -100,16 +100,18 @@
 
   /** Caller should ensure that the method is called in the lock {@link #tryLock()}. */
   public TSStatus createPipe(TCreatePipeReq req) {
-    return configManager.getProcedureManager().createPipe(req);
+    final TSStatus status = configManager.getProcedureManager().createPipe(req);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      LOGGER.warn("Failed to create pipe {}. Result status: {}.", req.getPipeName(), status);
+    }
+    return status;
   }
 
   /** Caller should ensure that the method is called in the lock {@link #tryLock()}. */
   public TSStatus startPipe(String pipeName) {
-    final boolean hasException = pipeTaskInfo.hasExceptions(pipeName);
     final TSStatus status = configManager.getProcedureManager().startPipe(pipeName);
-    if (status == RpcUtils.SUCCESS_STATUS && hasException) {
-      LOGGER.info("Pipe {} has started successfully, clear its exceptions.", pipeName);
-      configManager.getProcedureManager().pipeHandleMetaChange(true, true);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      LOGGER.warn("Failed to start pipe {}. Result status: {}.", pipeName, status);
     }
     return status;
   }
@@ -118,11 +120,24 @@
   public TSStatus stopPipe(String pipeName) {
     final boolean isStoppedByRuntimeException = pipeTaskInfo.isStoppedByRuntimeException(pipeName);
     final TSStatus status = configManager.getProcedureManager().stopPipe(pipeName);
-    if (status == RpcUtils.SUCCESS_STATUS && isStoppedByRuntimeException) {
-      LOGGER.info(
-          "Pipe {} has stopped successfully manually, stop its auto restart process.", pipeName);
-      pipeTaskInfo.setIsStoppedByRuntimeExceptionToFalse(pipeName);
-      configManager.getProcedureManager().pipeHandleMetaChange(true, true);
+    if (status == RpcUtils.SUCCESS_STATUS) {
+      if (isStoppedByRuntimeException) {
+        // Under normal circumstances, this branch is not executed because when
+        // `isStoppedByRuntimeException` is true, the pipe status is most likely to be STOPPED,
+        // unless this method is called between the execution of `autoRestartWithLock` and
+        // `handleSuccessfulRestartWithLock` in `PipeMetaSyncer` (in this case, the pipe status is
+        // RUNNING and `isStoppedByRuntimeException` is true).
+
+        // Even if return status is success, it doesn't imply the success of the
+        // `executeFromOperateOnDataNodes` phase of stopping pipe. However, we still need to set
+        // `isStoppedByRuntimeException` to false to avoid auto-restart. Meanwhile,
+        // `isStoppedByRuntimeException` does not need to be synchronized with DNs.
+        LOGGER.info("Pipe {} has stopped manually, stop its auto restart process.", pipeName);
+        pipeTaskInfo.setIsStoppedByRuntimeExceptionToFalse(pipeName);
+        configManager.getProcedureManager().pipeHandleMetaChange(true, false);
+      }
+    } else {
+      LOGGER.warn("Failed to stop pipe {}. Result status: {}.", pipeName, status);
     }
     return status;
   }