sub
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
index 867f04d..a0855d7 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
@@ -87,6 +87,9 @@
       throws SubscriptionException {
     LOGGER.info("CreateSubscriptionProcedure: executeFromValidate");
 
+    alterConsumerGroupProcedure = null;
+    createPipeProcedures = new ArrayList<>();
+
     subscriptionInfo.get().validateBeforeSubscribe(subscribeReq);
 
     final String consumerId = subscribeReq.getConsumerId();
@@ -161,8 +164,7 @@
       response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
       response.setMessage(e.getMessage());
     }
-    if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && response.getSubStatusSize() > 0) {
+    if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new SubscriptionException(
           String.format(
               "Failed to create subscription with request %s on config nodes, because %s",
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
index 6741a6c..6f668f2 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
@@ -85,6 +85,9 @@
       throws SubscriptionException {
     LOGGER.info("DropSubscriptionProcedure: executeFromValidate");
 
+    alterConsumerGroupProcedure = null;
+    dropPipeProcedures = new ArrayList<>();
+
     subscriptionInfo.get().validateBeforeUnsubscribe(unsubscribeReq);
 
     // Construct AlterConsumerGroupProcedure
@@ -141,8 +144,7 @@
       response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
       response.setMessage(e.getMessage());
     }
-    if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-        && response.getSubStatusSize() > 0) {
+    if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new SubscriptionException(
           String.format(
               "Failed to drop subscription with request %s on config nodes, because %s",
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedureTest.java
index 93d9941..e2a4d06 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedureTest.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedureTest.java
@@ -19,18 +19,36 @@
 
 package org.apache.iotdb.confignode.procedure.impl.subscription.subscription;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
 import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerMeta;
+import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
+import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.PermissionManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
+import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
+import org.apache.iotdb.confignode.manager.pipe.coordinator.plugin.PipePluginCoordinator;
+import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
+import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.impl.pipe.task.CreatePipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.AlterConsumerGroupProcedure;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
 
 import org.apache.tsfile.utils.PublicBAOS;
+import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.DataOutputStream;
+import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -39,6 +57,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -102,4 +121,119 @@
       fail();
     }
   }
+
+  @Test
+  public void executeFromOperateOnConfigNodesShouldFailOnTopLevelConsensusError() throws Exception {
+    final CreateSubscriptionProcedure proc =
+        new CreateSubscriptionProcedure(
+            new TSubscribeReq(
+                "old_consumer", "test_consumer_group", Collections.singleton("test_topic")));
+    proc.setAlterConsumerGroupProcedure(Mockito.mock(AlterConsumerGroupProcedure.class));
+
+    final CreatePipeProcedureV2 createPipeProcedure = Mockito.mock(CreatePipeProcedureV2.class);
+    Mockito.when(createPipeProcedure.constructPlan())
+        .thenReturn(Mockito.mock(CreatePipePlanV2.class));
+    proc.setCreatePipeProcedures(Collections.singletonList(createPipeProcedure));
+
+    try {
+      proc.executeFromOperateOnConfigNodes(
+          mockConsensusFailureEnv(
+              new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+                  .setMessage("consensus write failed")));
+      fail();
+    } catch (SubscriptionException e) {
+      Assert.assertTrue(e.getMessage().contains("Failed to create subscription"));
+    }
+  }
+
+  @Test
+  public void executeFromValidateShouldResetCreatePipeProceduresOnRetry() throws Exception {
+    final Map<String, String> consumerAttributes = new HashMap<>();
+    consumerAttributes.put("username", "user");
+    consumerAttributes.put("password", "password");
+
+    final ConsumerGroupMeta consumerGroupMeta =
+        new ConsumerGroupMeta(
+            "test_consumer_group", 1, new ConsumerMeta("old_consumer", 1, consumerAttributes));
+    final TopicMeta topicMeta = new TopicMeta("test_topic", 1, Collections.emptyMap());
+
+    final SubscriptionInfo subscriptionInfo = Mockito.mock(SubscriptionInfo.class);
+    Mockito.when(subscriptionInfo.getConsumerGroupMeta("test_consumer_group"))
+        .thenReturn(consumerGroupMeta);
+    Mockito.when(subscriptionInfo.deepCopyConsumerGroupMeta("test_consumer_group"))
+        .thenAnswer(invocation -> consumerGroupMeta.deepCopy());
+    Mockito.when(
+            subscriptionInfo.isTopicSubscribedByConsumerGroup("test_topic", "test_consumer_group"))
+        .thenReturn(false);
+    Mockito.when(subscriptionInfo.deepCopyTopicMeta("test_topic")).thenReturn(topicMeta);
+
+    final PipeTaskInfo pipeTaskInfo = Mockito.mock(PipeTaskInfo.class);
+    Mockito.when(pipeTaskInfo.checkBeforeCreatePipe(Mockito.any(TCreatePipeReq.class)))
+        .thenReturn(true);
+
+    final CreateSubscriptionProcedure proc =
+        new CreateSubscriptionProcedure(
+            new TSubscribeReq(
+                "old_consumer", "test_consumer_group", Collections.singleton("test_topic")));
+    setField(proc, "subscriptionInfo", new AtomicReference<>(subscriptionInfo));
+    setField(proc, "pipeTaskInfo", new AtomicReference<>(pipeTaskInfo));
+
+    final ConfigNodeProcedureEnv env = mockCreateSubscriptionValidationEnv();
+    proc.executeFromValidate(env);
+    Assert.assertEquals(1, proc.getCreatePipeProcedures().size());
+
+    proc.executeFromValidate(env);
+    Assert.assertEquals(1, proc.getCreatePipeProcedures().size());
+  }
+
+  private static ConfigNodeProcedureEnv mockConsensusFailureEnv(final TSStatus response)
+      throws Exception {
+    final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
+    final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    final ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
+
+    Mockito.when(env.getConfigManager()).thenReturn(configManager);
+    Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+    Mockito.when(consensusManager.write(Mockito.any())).thenReturn(response);
+
+    return env;
+  }
+
+  private static ConfigNodeProcedureEnv mockCreateSubscriptionValidationEnv() {
+    final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
+    final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    final PermissionManager permissionManager = Mockito.mock(PermissionManager.class);
+    final PipeManager pipeManager = Mockito.mock(PipeManager.class);
+    final PipePluginCoordinator pipePluginCoordinator = Mockito.mock(PipePluginCoordinator.class);
+    final PipePluginInfo pipePluginInfo = Mockito.mock(PipePluginInfo.class);
+    final LoadManager loadManager = Mockito.mock(LoadManager.class);
+
+    Mockito.when(env.getConfigManager()).thenReturn(configManager);
+    Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager);
+    Mockito.when(configManager.getPipeManager()).thenReturn(pipeManager);
+    Mockito.when(pipeManager.getPipePluginCoordinator()).thenReturn(pipePluginCoordinator);
+    Mockito.when(pipePluginCoordinator.getPipePluginInfo()).thenReturn(pipePluginInfo);
+    Mockito.when(configManager.getLoadManager()).thenReturn(loadManager);
+    Mockito.when(loadManager.getRegionLeaderMap()).thenReturn(Collections.emptyMap());
+    Mockito.when(permissionManager.login4Pipe(Mockito.anyString(), Mockito.any()))
+        .thenReturn("hashedPassword");
+
+    return env;
+  }
+
+  private static void setField(final Object target, final String fieldName, final Object value)
+      throws Exception {
+    Class<?> clazz = target.getClass();
+    while (clazz != null) {
+      try {
+        final Field field = clazz.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        field.set(target, value);
+        return;
+      } catch (NoSuchFieldException e) {
+        clazz = clazz.getSuperclass();
+      }
+    }
+    throw new NoSuchFieldException(fieldName);
+  }
 }
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedureTest.java
index 9ecce2a..910648b 100644
--- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedureTest.java
+++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedureTest.java
@@ -19,24 +19,37 @@
 
 package org.apache.iotdb.confignode.procedure.impl.subscription.subscription;
 
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
 import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerMeta;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
+import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.impl.pipe.task.DropPipeProcedureV2;
 import org.apache.iotdb.confignode.procedure.impl.subscription.consumer.AlterConsumerGroupProcedure;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
 
 import org.apache.tsfile.utils.PublicBAOS;
+import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.DataOutputStream;
+import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -91,4 +104,90 @@
       fail();
     }
   }
+
+  @Test
+  public void executeFromOperateOnConfigNodesShouldFailOnTopLevelConsensusError() throws Exception {
+    final DropSubscriptionProcedure proc =
+        new DropSubscriptionProcedure(
+            new TUnsubscribeReq(
+                "old_consumer", "test_consumer_group", Collections.singleton("test_topic")));
+    proc.setAlterConsumerGroupProcedure(Mockito.mock(AlterConsumerGroupProcedure.class));
+
+    final DropPipeProcedureV2 dropPipeProcedure = Mockito.mock(DropPipeProcedureV2.class);
+    Mockito.when(dropPipeProcedure.getPipeName()).thenReturn("pipe_topic");
+    proc.setDropPipeProcedures(Collections.singletonList(dropPipeProcedure));
+
+    try {
+      proc.executeFromOperateOnConfigNodes(
+          mockConsensusFailureEnv(
+              new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+                  .setMessage("consensus write failed")));
+      fail();
+    } catch (SubscriptionException e) {
+      Assert.assertTrue(e.getMessage().contains("Failed to drop subscription"));
+    }
+  }
+
+  @Test
+  public void executeFromValidateShouldResetDropPipeProceduresOnRetry() throws Exception {
+    final Map<String, String> consumerAttributes = new HashMap<>();
+    consumerAttributes.put("username", "user");
+    consumerAttributes.put("password", "password");
+
+    final ConsumerGroupMeta consumerGroupMeta =
+        new ConsumerGroupMeta(
+            "test_consumer_group", 1, new ConsumerMeta("old_consumer", 1, consumerAttributes));
+    consumerGroupMeta.addSubscription("old_consumer", Collections.singleton("test_topic"));
+
+    final SubscriptionInfo subscriptionInfo = Mockito.mock(SubscriptionInfo.class);
+    Mockito.when(subscriptionInfo.getConsumerGroupMeta("test_consumer_group"))
+        .thenReturn(consumerGroupMeta);
+    Mockito.when(subscriptionInfo.deepCopyConsumerGroupMeta("test_consumer_group"))
+        .thenAnswer(invocation -> consumerGroupMeta.deepCopy());
+
+    final PipeTaskInfo pipeTaskInfo = Mockito.mock(PipeTaskInfo.class);
+
+    final DropSubscriptionProcedure proc =
+        new DropSubscriptionProcedure(
+            new TUnsubscribeReq(
+                "old_consumer", "test_consumer_group", Collections.singleton("test_topic")));
+    setField(proc, "subscriptionInfo", new AtomicReference<>(subscriptionInfo));
+    setField(proc, "pipeTaskInfo", new AtomicReference<>(pipeTaskInfo));
+
+    final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
+    proc.executeFromValidate(env);
+    Assert.assertEquals(1, proc.getDropPipeProcedures().size());
+
+    proc.executeFromValidate(env);
+    Assert.assertEquals(1, proc.getDropPipeProcedures().size());
+  }
+
+  private static ConfigNodeProcedureEnv mockConsensusFailureEnv(final TSStatus response)
+      throws Exception {
+    final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
+    final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    final ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
+
+    Mockito.when(env.getConfigManager()).thenReturn(configManager);
+    Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+    Mockito.when(consensusManager.write(Mockito.any())).thenReturn(response);
+
+    return env;
+  }
+
+  private static void setField(final Object target, final String fieldName, final Object value)
+      throws Exception {
+    Class<?> clazz = target.getClass();
+    while (clazz != null) {
+      try {
+        final Field field = clazz.getDeclaredField(fieldName);
+        field.setAccessible(true);
+        field.set(target, value);
+        return;
+      } catch (NoSuchFieldException e) {
+        clazz = clazz.getSuperclass();
+      }
+    }
+    throw new NoSuchFieldException(fieldName);
+  }
 }
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
index f89bfbc..83e5b37 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
@@ -72,8 +72,11 @@
     final ConsumerGroupMeta copied = new ConsumerGroupMeta();
     copied.consumerGroupId = consumerGroupId;
     copied.creationTime = creationTime;
-    copied.topicNameToSubscribedConsumerIdSet =
-        new ConcurrentHashMap<>(topicNameToSubscribedConsumerIdSet);
+    copied.topicNameToSubscribedConsumerIdSet = new ConcurrentHashMap<>();
+    topicNameToSubscribedConsumerIdSet.forEach(
+        (topicName, subscribedConsumerIds) ->
+            copied.topicNameToSubscribedConsumerIdSet.put(
+                topicName, new HashSet<>(subscribedConsumerIds)));
     copied.consumerIdToConsumerMeta = new ConcurrentHashMap<>(consumerIdToConsumerMeta);
     copied.topicNameToSubscriptionCreationTime =
         new ConcurrentHashMap<>(topicNameToSubscriptionCreationTime);
diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java
index d41b370..9ef7191 100644
--- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java
+++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java
@@ -69,4 +69,24 @@
         consumerGroupMeta.getConsumerGroupId(), consumerGroupMeta2.getConsumerGroupId());
     Assert.assertEquals(consumerGroupMeta.getCreationTime(), consumerGroupMeta2.getCreationTime());
   }
+
+  @Test
+  public void testDeepCopyShouldNotShareSubscribedConsumerSets() {
+    Map<String, String> consumerAttributes = new HashMap<>();
+    consumerAttributes.put("username", "user");
+    consumerAttributes.put("password", "password");
+
+    ConsumerGroupMeta consumerGroupMeta =
+        new ConsumerGroupMeta(
+            "test_consumer_group", 1, new ConsumerMeta("test_consumer1", 1, consumerAttributes));
+    consumerGroupMeta.addSubscription("test_consumer1", Collections.singleton("test_topic"));
+
+    ConsumerGroupMeta copiedConsumerGroupMeta = consumerGroupMeta.deepCopy();
+    copiedConsumerGroupMeta.removeSubscription(
+        "test_consumer1", Collections.singleton("test_topic"));
+
+    Assert.assertTrue(
+        consumerGroupMeta.getConsumersSubscribingTopic("test_topic").contains("test_consumer1"));
+    Assert.assertTrue(copiedConsumerGroupMeta.getConsumersSubscribingTopic("test_topic").isEmpty());
+  }
 }