Revert "Fix CN pipe procedures restore dead lock (#16324)"
This reverts commit b422e9a5c59391a2aa17234c3cf5a230bbce9830.
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
index aaafcda..1df5ac0 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeMetaSyncer.java
@@ -146,7 +146,7 @@
private boolean autoRestartWithLock() {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
- configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
+ configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
LOGGER.warn("Failed to acquire pipe lock for auto restart pipe task.");
return false;
@@ -160,7 +160,7 @@
private boolean handleSuccessfulRestartWithLock() {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
- configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
+ configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
LOGGER.warn("Failed to acquire pipe lock for handling successful restart.");
return false;
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
index 8e13a046..ace07f5 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java
@@ -99,7 +99,7 @@
.submit(
() -> {
final AtomicReference<PipeTaskInfo> pipeTaskInfo =
- configManager.getPipeManager().getPipeTaskCoordinator().tryLock().left;
+ configManager.getPipeManager().getPipeTaskCoordinator().tryLock();
if (pipeTaskInfo == null) {
LOGGER.warn(
"Failed to acquire lock when parseHeartbeat from node (id={}).", nodeId);
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index 25f8e0c..4aaf3ab 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -37,7 +37,6 @@
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,11 +68,10 @@
* @return the pipe task info holder, which can be used to get the pipe task info. The holder is
* null if the lock is not acquired.
*/
- public Pair<AtomicReference<PipeTaskInfo>, Long> tryLock() {
- long lockSeqId = pipeTaskCoordinatorLock.tryLock();
- if (lockSeqId != -1) {
+ public AtomicReference<PipeTaskInfo> tryLock() {
+ if (pipeTaskCoordinatorLock.tryLock()) {
pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
- return new Pair<>(pipeTaskInfoHolder, lockSeqId);
+ return pipeTaskInfoHolder;
}
return null;
@@ -85,10 +83,10 @@
* @return the {@link PipeTaskInfo} holder, which can be used to get the {@link PipeTaskInfo}.
* Wait until lock is acquired
*/
- public Pair<AtomicReference<PipeTaskInfo>, Long> lock() {
- long lockSeqId = pipeTaskCoordinatorLock.lock();
+ public AtomicReference<PipeTaskInfo> lock() {
+ pipeTaskCoordinatorLock.lock();
pipeTaskInfoHolder = new AtomicReference<>(pipeTaskInfo);
- return new Pair<>(pipeTaskInfoHolder, lockSeqId);
+ return pipeTaskInfoHolder;
}
/**
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
index 0caf620..e57add9 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinatorLock.java
@@ -37,9 +37,8 @@
private final BlockingDeque<Long> deque = new LinkedBlockingDeque<>(1);
private final AtomicLong idGenerator = new AtomicLong(0);
- private final AtomicLong lockSeqIdGenerator = new AtomicLong(0);
- public long lock() {
+ public void lock() {
try {
final long id = idGenerator.incrementAndGet();
LOGGER.debug(
@@ -51,17 +50,15 @@
"PipeTaskCoordinator lock (id: {}) acquired by thread {}",
id,
Thread.currentThread().getName());
- return lockSeqIdGenerator.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error(
"Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}",
Thread.currentThread().getName());
- return -1;
}
}
- public long tryLock() {
+ public boolean tryLock() {
try {
final long id = idGenerator.incrementAndGet();
LOGGER.debug(
@@ -73,20 +70,20 @@
"PipeTaskCoordinator lock (id: {}) acquired by thread {}",
id,
Thread.currentThread().getName());
- return lockSeqIdGenerator.incrementAndGet();
+ return true;
} else {
LOGGER.info(
"PipeTaskCoordinator lock (id: {}) failed to acquire by thread {} because of timeout",
id,
Thread.currentThread().getName());
- return -1;
+ return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.error(
"Interrupted while waiting for PipeTaskCoordinator lock, current thread: {}",
Thread.currentThread().getName());
- return -1;
+ return false;
}
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index d90e1bb..b52f958 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -79,7 +79,7 @@
/////////////////////////////// Lock ///////////////////////////////
public AtomicReference<SubscriptionInfo> tryLock() {
- if (coordinatorLock.tryLock() != -1) {
+ if (coordinatorLock.tryLock()) {
subscriptionInfoHolder = new AtomicReference<>(subscriptionInfo);
return subscriptionInfoHolder;
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index f0f28ec..0d83685 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -23,7 +23,6 @@
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
-import org.apache.iotdb.confignode.procedure.impl.pipe.AbstractOperatePipeProcedureV2;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
@@ -38,12 +37,10 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -52,7 +49,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
import static org.apache.iotdb.confignode.procedure.Procedure.NO_PROC_ID;
@@ -120,41 +116,9 @@
recover();
}
- /***
- * Filter out pipe procedures that do not need to re-acquire lock and re-execute when there are multiple locked pipe procedures during restore.
- * @return non pipe procedures and one pipe procedure with max lock seq id (if there is.)
- */
- private List<Procedure<Env>> filteredProcedureList(final List<Procedure<Env>> procedures) {
- List<Procedure<Env>> nonPipeOrLockedProcedures =
- procedures.stream()
- .filter(p -> !(p instanceof AbstractOperatePipeProcedureV2) || !p.isLockedWhenLoading())
- .collect(Collectors.toList());
-
- List<AbstractOperatePipeProcedureV2> lockedPipeProcedures =
- procedures.stream()
- .filter(p -> p instanceof AbstractOperatePipeProcedureV2 && p.isLockedWhenLoading())
- .map(AbstractOperatePipeProcedureV2.class::cast)
- .collect(Collectors.toList());
- Optional<Procedure<Env>> maxPipeProcedure =
- lockedPipeProcedures.stream()
- .max(Comparator.comparingLong(AbstractOperatePipeProcedureV2::getLockSeqId))
- .map(p -> (Procedure<Env>) p);
-
- if (lockedPipeProcedures.size() > 1) {
- LOG.warn(
- "[Procedure restore]Detected multiple locked pipe procedures in procedure executor {}, only keep last one {}",
- lockedPipeProcedures,
- maxPipeProcedure.get());
- }
-
- maxPipeProcedure.ifPresent(nonPipeOrLockedProcedures::add);
- return nonPipeOrLockedProcedures;
- }
-
private void recover() {
// 1.Build rollback stack
- List<Procedure<Env>> procedureList =
- filteredProcedureList(getProcedureListFromDifferentVersion());
+ List<Procedure<Env>> procedureList = getProcedureListFromDifferentVersion();
// Load procedure wal file
for (Procedure<Env> proc : procedureList) {
if (proc.isFinished()) {
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 0dde52a..25466d3 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -37,7 +37,6 @@
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,17 +94,17 @@
// This variable should not be serialized into procedure store,
// putting it here is just for convenience
protected AtomicReference<PipeTaskInfo> pipeTaskInfo;
- protected long lockSeqId;
private static final String SKIP_PIPE_PROCEDURE_MESSAGE =
"Try to start a RUNNING pipe or stop a STOPPED pipe, do nothing.";
protected AtomicReference<PipeTaskInfo> acquireLockInternal(
ConfigNodeProcedureEnv configNodeProcedureEnv) {
- Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
- configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
- lockSeqId = lockRes.right;
- return lockRes.left;
+ return configNodeProcedureEnv
+ .getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .lock();
}
@Override
@@ -189,10 +188,6 @@
}
}
- public long getLockSeqId() {
- return lockSeqId;
- }
-
protected abstract PipeTaskOperation getOperation();
/**
@@ -617,15 +612,11 @@
public void serialize(DataOutputStream stream) throws IOException {
super.serialize(stream);
ReadWriteIOUtils.write(isRollbackFromOperateOnDataNodesSuccessful, stream);
- ReadWriteIOUtils.write(lockSeqId, stream);
}
@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
isRollbackFromOperateOnDataNodesSuccessful = ReadWriteIOUtils.readBool(byteBuffer);
- if (byteBuffer.remaining() >= Long.BYTES) {
- lockSeqId = ReadWriteIOUtils.readLong(byteBuffer);
- }
}
}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 28c5993..665a378 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -122,7 +122,7 @@
final SubscriptionCoordinator subscriptionCoordinator =
env.getConfigManager().getSubscriptionManager().getSubscriptionCoordinator();
- final AtomicReference<PipeTaskInfo> pipeTaskInfo = pipeTaskCoordinator.lock().left;
+ final AtomicReference<PipeTaskInfo> pipeTaskInfo = pipeTaskCoordinator.lock();
pipePluginCoordinator.lock();
SubscriptionInfo subscriptionInfo = subscriptionCoordinator.getSubscriptionInfo();
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 6b2fec6..401859f 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -31,7 +31,6 @@
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,14 +64,11 @@
@Override
protected AtomicReference<PipeTaskInfo> acquireLockInternal(
ConfigNodeProcedureEnv configNodeProcedureEnv) {
- Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
- configNodeProcedureEnv
- .getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .tryLock();
- lockSeqId = lockRes.right;
- return lockRes.left;
+ return configNodeProcedureEnv
+ .getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .tryLock();
}
@Override
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 81ed75b..393a8bd 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -39,7 +39,6 @@
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,14 +73,11 @@
@Override
protected AtomicReference<PipeTaskInfo> acquireLockInternal(
ConfigNodeProcedureEnv configNodeProcedureEnv) {
- Pair<AtomicReference<PipeTaskInfo>, Long> lockRes =
- configNodeProcedureEnv
- .getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .tryLock();
- lockSeqId = lockRes.right;
- return lockRes.left;
+ return configNodeProcedureEnv
+ .getConfigManager()
+ .getPipeManager()
+ .getPipeTaskCoordinator()
+ .tryLock();
}
@Override
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
index 16391a9..de90951 100644
--- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
+++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/AbstractOperateSubscriptionAndPipeProcedure.java
@@ -50,12 +50,7 @@
LOGGER.info("ProcedureId {} try to acquire subscription and pipe lock.", getProcId());
pipeTaskInfo =
- configNodeProcedureEnv
- .getConfigManager()
- .getPipeManager()
- .getPipeTaskCoordinator()
- .lock()
- .left;
+ configNodeProcedureEnv.getConfigManager().getPipeManager().getPipeTaskCoordinator().lock();
if (pipeTaskInfo == null) {
LOGGER.warn("ProcedureId {} failed to acquire pipe lock.", getProcId());
} else {