IGNITE-15444 Implement MetaStorageManager component stop (#332)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
new file mode 100644
index 0000000..06a9d48
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Synchronization aid to track "busy" state of a subsystem that owns it.
+ * <p>
+ * This class is implemented
+ * over {@link IgniteSpinReadWriteLock}.
+ * <p>
+ * For example, there may be a manager that have different threads for some
+ * purposes and the manager must not be stopped while at least a single thread
+ * is in "busy" state. In this situation each thread must enter to "busy"
+ * state calling method {@link #enterBusy()} in critical pieces of code
+ * which, i.e. use grid kernal functionality, notifying that the manager
+ * and the whole grid kernal cannot be stopped while it's in progress. Once
+ * the activity is done, the thread should leave "busy" state calling method
+ * {@link #leaveBusy()}. The manager itself, when stopping, should call method
+ * {@link #block} that blocks till all activities leave "busy" state.
+ * @see IgniteSpinReadWriteLock
+ */
+public class IgniteSpinBusyLock {
+ /** Underlying read-write lock. */
+ private final IgniteSpinReadWriteLock lock = new IgniteSpinReadWriteLock();
+
+ /**
+ * Enters "busy" state.
+ *
+ * @return {@code true} if entered to busy state.
+ */
+ public boolean enterBusy() {
+ return !lock.writeLockedByCurrentThread() && lock.tryReadLock();
+ }
+
+ /**
+ * Checks if busy lock was blocked by current thread.
+ *
+ * @return {@code True} if busy lock was blocked by current thread.
+ */
+ public boolean blockedByCurrentThread() {
+ return lock.writeLockedByCurrentThread();
+ }
+
+ /**
+ * Leaves "busy" state.
+ */
+ public void leaveBusy() {
+ lock.readUnlock();
+ }
+
+ /**
+ * Blocks current thread till all activities left "busy" state
+ * and prevents them from further entering to "busy" state.
+ */
+ public void block() {
+ lock.writeLock();
+ }
+
+ /**
+ * @param millis Timeout.
+ * @return {@code True} if lock was acquired.
+ * @throws InterruptedException If interrupted.
+ */
+ public boolean tryBlock(long millis) throws InterruptedException {
+ return lock.tryWriteLock(millis, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Makes possible for activities entering busy state again.
+ */
+ public void unblock() {
+ lock.writeUnlock();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLock.java
new file mode 100644
index 0000000..f1deb9f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLock.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Spin read-write lock.
+ */
+public class IgniteSpinReadWriteLock {
+ /** */
+ private static final long PENDING_WLOCKS_OFFS;
+
+ /** */
+ private static final long STATE_OFFS;
+
+ /**
+ * TODO: replace UNSAFE usage using VarHandle https://issues.apache.org/jira/browse/IGNITE-15536
+ */
+ static {
+ try {
+ STATE_OFFS = GridUnsafe.objectFieldOffset(IgniteSpinReadWriteLock.class.getDeclaredField("state"));
+
+ PENDING_WLOCKS_OFFS =
+ GridUnsafe.objectFieldOffset(IgniteSpinReadWriteLock.class.getDeclaredField("pendingWLocks"));
+ }
+ catch (NoSuchFieldException e) {
+ throw new Error(e);
+ }
+ }
+
+ /** */
+ private final ThreadLocal<Integer> readLockEntryCnt = new ThreadLocal<Integer>() {
+ @Override protected Integer initialValue() {
+ return 0;
+ }
+ };
+
+ /** */
+ private volatile int state;
+
+ /** */
+ private volatile int pendingWLocks;
+
+ /** */
+ private long writeLockOwner = -1;
+
+ /** */
+ private int writeLockEntryCnt;
+
+ /**
+ * Acquires read lock.
+ */
+ @SuppressWarnings("BusyWait")
+ public void readLock() {
+ int cnt = readLockEntryCnt.get();
+
+ // Read lock reentry or acquiring read lock while holding write lock.
+ if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) {
+ assert state > 0 || state == -1;
+
+ readLockEntryCnt.set(cnt + 1);
+
+ return;
+ }
+
+ boolean interrupted = false;
+
+ while (true) {
+ int cur = state;
+
+ assert cur >= -1;
+
+ if (cur == -1 || pendingWLocks > 0) {
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException ignored) {
+ interrupted = true;
+ }
+
+ continue;
+ }
+
+ if (compareAndSet(STATE_OFFS, cur, cur + 1)) {
+ if (interrupted)
+ Thread.currentThread().interrupt();
+
+ break;
+ }
+ }
+
+ readLockEntryCnt.set(1);
+ }
+
+ /**
+ * Tries to acquire read lock.
+ *
+ * @return {@code true} if acquired.
+ */
+ public boolean tryReadLock() {
+ int cnt = readLockEntryCnt.get();
+
+ // Read lock reentry or acquiring read lock while holding write lock.
+ if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) {
+ assert state > 0 || state == -1;
+
+ readLockEntryCnt.set(cnt + 1);
+
+ return true;
+ }
+
+ while (true) {
+ int cur = state;
+
+ if (cur == -1 || pendingWLocks > 0)
+ return false;
+
+ if (compareAndSet(STATE_OFFS, cur, cur + 1)) {
+ readLockEntryCnt.set(1);
+
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Read unlock.
+ */
+ public void readUnlock() {
+ int cnt = readLockEntryCnt.get();
+
+ if (cnt == 0)
+ throw new IllegalMonitorStateException();
+
+ // Read unlock when holding write lock is performed here.
+ if (cnt > 1 || Thread.currentThread().getId() == writeLockOwner) {
+ assert state > 0 || state == -1;
+
+ readLockEntryCnt.set(cnt - 1);
+
+ return;
+ }
+
+ while (true) {
+ int cur = state;
+
+ assert cur > 0;
+
+ if (compareAndSet(STATE_OFFS, cur, cur - 1)) {
+ readLockEntryCnt.set(0);
+
+ return;
+ }
+ }
+ }
+
+ /**
+ * Acquires write lock.
+ */
+ @SuppressWarnings("BusyWait")
+ public void writeLock() {
+ long threadId = Thread.currentThread().getId();
+
+ if (threadId == writeLockOwner) {
+ assert state == -1;
+
+ writeLockEntryCnt++;
+
+ return;
+ }
+
+ // Increment pending write locks.
+ while (true) {
+ int pendingWLocks0 = pendingWLocks;
+
+ if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
+ break;
+ }
+
+ boolean interrupted = false;
+
+ while (!compareAndSet(STATE_OFFS, 0, -1)) {
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException ignored) {
+ interrupted = true;
+ }
+ }
+
+ // Decrement pending write locks.
+ while (true) {
+ int pendingWLocks0 = pendingWLocks;
+
+ assert pendingWLocks0 > 0;
+
+ if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
+ break;
+ }
+
+ if (interrupted)
+ Thread.currentThread().interrupt();
+
+ assert writeLockOwner == -1;
+
+ writeLockOwner = threadId;
+ writeLockEntryCnt = 1;
+ }
+
+ /**
+ * Acquires write lock without sleeping between unsuccessful attempts.
+ */
+ public void writeLock0() {
+ long threadId = Thread.currentThread().getId();
+
+ if (threadId == writeLockOwner) {
+ assert state == -1;
+
+ writeLockEntryCnt++;
+
+ return;
+ }
+
+ // Increment pending write locks.
+ while (true) {
+ int pendingWLocks0 = pendingWLocks;
+
+ if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
+ break;
+ }
+
+ for (;;) {
+ if (compareAndSet(STATE_OFFS, 0, -1))
+ break;
+ }
+
+ // Decrement pending write locks.
+ while (true) {
+ int pendingWLocks0 = pendingWLocks;
+
+ assert pendingWLocks0 > 0;
+
+ if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
+ break;
+ }
+
+ assert writeLockOwner == -1;
+
+ writeLockOwner = threadId;
+ writeLockEntryCnt = 1;
+ }
+
+ /**
+ * @return {@code True} if blocked by current thread.
+ */
+ public boolean writeLockedByCurrentThread() {
+ return writeLockOwner == Thread.currentThread().getId();
+ }
+
+ /**
+ * Tries to acquire write lock.
+ *
+ * @return {@code True} if write lock has been acquired.
+ */
+ public boolean tryWriteLock() {
+ long threadId = Thread.currentThread().getId();
+
+ if (threadId == writeLockOwner) {
+ assert state == -1;
+
+ writeLockEntryCnt++;
+
+ return true;
+ }
+
+ if (compareAndSet(STATE_OFFS, 0, -1)) {
+ assert writeLockOwner == -1;
+
+ writeLockOwner = threadId;
+ writeLockEntryCnt = 1;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param timeout Timeout.
+ * @param unit Unit.
+ * @return {@code True} if write lock has been acquired.
+ * @throws InterruptedException If interrupted.
+ */
+ @SuppressWarnings("BusyWait")
+ public boolean tryWriteLock(long timeout, TimeUnit unit) throws InterruptedException {
+ long threadId = Thread.currentThread().getId();
+
+ if (threadId == writeLockOwner) {
+ assert state == -1;
+
+ writeLockEntryCnt++;
+
+ return true;
+ }
+
+ try {
+ // Increment pending write locks.
+ while (true) {
+ int pendingWLocks0 = pendingWLocks;
+
+ if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
+ break;
+ }
+
+ long startNanos = System.nanoTime();
+
+ long timeoutNanos = unit.toNanos(timeout);
+
+ while (true) {
+ if (compareAndSet(STATE_OFFS, 0, -1)) {
+ assert writeLockOwner == -1;
+
+ writeLockOwner = threadId;
+ writeLockEntryCnt = 1;
+
+ return true;
+ }
+
+ Thread.sleep(10);
+
+ if (System.nanoTime() - startNanos >= timeoutNanos)
+ return false;
+ }
+ }
+ finally {
+ // Decrement pending write locks.
+ while (true) {
+ int pendingWLocks0 = pendingWLocks;
+
+ assert pendingWLocks0 > 0;
+
+ if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
+ break;
+ }
+ }
+ }
+
+ /**
+ * Releases write lock.
+ */
+ public void writeUnlock() {
+ long threadId = Thread.currentThread().getId();
+
+ if (threadId != writeLockOwner)
+ throw new IllegalMonitorStateException();
+
+ if (writeLockEntryCnt > 1) {
+ writeLockEntryCnt--;
+
+ return;
+ }
+
+ writeLockEntryCnt = 0;
+ writeLockOwner = -1;
+
+ // Current thread holds write and read locks and is releasing
+ // write lock now.
+ int update = readLockEntryCnt.get() > 0 ? 1 : 0;
+
+ boolean b = compareAndSet(STATE_OFFS, -1, update);
+
+ assert b;
+ }
+
+ /**
+ * @param offs Offset.
+ * @param expect Expected.
+ * @param update Update.
+ * @return {@code True} on success.
+ */
+ private boolean compareAndSet(long offs, int expect, int update) {
+ return GridUnsafe.compareAndSwapInt(this, offs, expect, update);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteSpinReadWriteLock.class, this);
+ }
+}
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java
index 79c2daf..aae07fc 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java
@@ -76,7 +76,10 @@
cursorId -> metaStorageRaftGrpSvc.run(new CursorCloseCommand(cursorId))).get();
}
catch (InterruptedException | ExecutionException e) {
- LOG.error("Unable to evaluate cursor close command", e);
+ if (e.getCause() != null && e.getCause().getClass().equals(NodeStoppingException.class))
+ return;
+
+ LOG.debug("Unable to evaluate cursor close command", e);
throw new IgniteInternalException(e);
}
@@ -101,10 +104,10 @@
cursorId -> metaStorageRaftGrpSvc.<Boolean>run(new CursorHasNextCommand(cursorId))).get();
}
catch (InterruptedException | ExecutionException e) {
- if (e.getCause().getClass().equals(NodeStoppingException.class))
+ if (e.getCause() != null && e.getCause().getClass().equals(NodeStoppingException.class))
return false;
- LOG.error("Unable to evaluate cursor hasNext command", e);
+ LOG.debug("Unable to evaluate cursor hasNext command", e);
throw new IgniteInternalException(e);
}
@@ -122,10 +125,10 @@
return fn.apply(res);
}
catch (InterruptedException | ExecutionException e) {
- if (e.getCause().getClass().equals(NodeStoppingException.class))
+ if (e.getCause() != null && e.getCause().getClass().equals(NodeStoppingException.class))
throw new NoSuchElementException();
- LOG.error("Unable to evaluate cursor hasNext command", e);
+ LOG.debug("Unable to evaluate cursor hasNext command", e);
throw new IgniteInternalException(e);
}
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
index 482489a..266f2ef 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
@@ -27,6 +27,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
import org.apache.ignite.internal.metastorage.common.OperationType;
import org.apache.ignite.internal.metastorage.common.command.ConditionInfo;
import org.apache.ignite.internal.metastorage.common.command.GetAllCommand;
@@ -400,10 +401,12 @@
watchers.computeIfPresent(
watchId,
(k, v) -> {
- CompletableFuture.runAsync(v::interrupt).thenRun(() -> {
- try {
- v.stop = true;
+ CompletableFuture.runAsync(() -> {
+ v.stop = true;
+ v.interrupt();
+ }).thenRun(() -> {
+ try {
Thread.sleep(100);
v.cursor.close();
@@ -412,6 +415,11 @@
throw new IgniteInternalException(e);
}
catch (Exception e) {
+ if (e instanceof IgniteInternalException && e.getCause().getCause() instanceof RejectedExecutionException) {
+ LOG.warn("Cursor close command was rejected because raft executor has been already stopped.");
+ return;
+ }
+
// TODO: IGNITE-14693 Implement Meta storage exception handling logic.
LOG.error("Unexpected exception", e);
}
@@ -470,7 +478,11 @@
catch (Throwable e) {
if (e instanceof NodeStoppingException || e.getCause() instanceof NodeStoppingException)
break;
- else {
+ else if ((e instanceof InterruptedException || e.getCause() instanceof InterruptedException) && stop) {
+ LOG.debug("Watcher has been stopped during node's stop");
+
+ break;
+ } else {
// TODO: IGNITE-14693 Implement Meta storage exception handling logic.
LOG.error("Unexpected exception", e);
}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 8eee25e..273b9dc 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -46,15 +47,19 @@
import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.TopologyEventHandler;
+import org.apache.ignite.raft.client.service.RaftGroupService;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -72,6 +77,9 @@
// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
@SuppressWarnings("unused")
public class MetaStorageManager implements IgniteComponent {
+ /** Logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(MetaStorageManager.class);
+
/** Meta storage raft group name. */
private static final String METASTORAGE_RAFT_GROUP_NAME = "metastorage_raft_group";
@@ -96,6 +104,9 @@
/** Meta storage service. */
private volatile CompletableFuture<MetaStorageService> metaStorageSvcFut;
+ /** Raft group service. */
+ private volatile CompletableFuture<RaftGroupService> raftGroupServiceFut;
+
/**
* Aggregator of multiple watches to deploy them as one batch.
*
@@ -124,6 +135,9 @@
/** Actual storage for the Metastorage. */
private final KeyValueStorage storage;
+ /** Busy lock for stop synchronisation. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
/**
* The constructor.
*
@@ -164,11 +178,13 @@
storage.start();
- this.metaStorageSvcFut = raftMgr.prepareRaftGroup(
+ raftGroupServiceFut = raftMgr.prepareRaftGroup(
METASTORAGE_RAFT_GROUP_NAME,
metaStorageMembers,
() -> new MetaStorageListener(storage)
- ).thenApply(service ->
+ );
+
+ this.metaStorageSvcFut = raftGroupServiceFut.thenApply(service ->
new MetaStorageServiceImpl(service, clusterNetSvc.topologyService().localMember().id())
);
@@ -201,7 +217,47 @@
/** {@inheritDoc} */
@Override public void stop() {
- // TODO: IGNITE-15161 Implement component's stop.
+ busyLock.block();
+
+ Optional<IgniteUuid> watchId;
+
+ try {
+ // If deployed future is not done, that means that stop was called in the middle of
+ // IgniteImpl.start, before deployWatches, or before init phase.
+ // It is correct to check completeness of the future because the method calls are guarded by busy lock.
+ // TODO: add busy lock for init method https://issues.apache.org/jira/browse/IGNITE-14414
+ if (deployFut.isDone()) {
+ watchId = deployFut.get();
+
+ try {
+ if (watchId.isPresent())
+ metaStorageSvcFut.get().stopWatch(watchId.get());
+ }
+ catch (InterruptedException | ExecutionException e) {
+ LOG.error("Failed to get meta storage service.");
+
+ throw new IgniteInternalException(e);
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Failed to get watch.");
+
+ throw new IgniteInternalException(e);
+ }
+
+ try {
+ if (raftGroupServiceFut != null) {
+ raftGroupServiceFut.get().shutdown();
+
+ raftMgr.stopRaftGroup(METASTORAGE_RAFT_GROUP_NAME, metastorageNodes());
+ }
+ }
+ catch (InterruptedException | ExecutionException e) {
+ LOG.error("Failed to get meta storage raft group service.");
+
+ throw new IgniteInternalException(e);
+ }
+
try {
storage.close();
}
@@ -213,26 +269,33 @@
/**
* Deploy all registered watches.
*/
- public synchronized void deployWatches() {
- var watch = watchAggregator.watch(
- appliedRevision() + 1,
- this::storeEntries
- );
+ public synchronized void deployWatches() throws NodeStoppingException {
+ if (!busyLock.enterBusy())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
- if (watch.isEmpty())
- deployFut.complete(Optional.empty());
- else {
- CompletableFuture<Void> fut =
- dispatchAppropriateMetaStorageWatch(watch.get()).thenAccept(id -> deployFut.complete(Optional.of(id)));
+ try {
+ var watch = watchAggregator.watch(
+ appliedRevision() + 1,
+ this::storeEntries
+ );
- if (metaStorageNodesOnStart)
- fut.join();
+ if (watch.isEmpty())
+ deployFut.complete(Optional.empty());
else {
- // TODO: need to wait for this future in init phase https://issues.apache.org/jira/browse/IGNITE-14414
- }
- }
+ CompletableFuture<Void> fut =
+ dispatchAppropriateMetaStorageWatch(watch.get()).thenAccept(id -> deployFut.complete(Optional.of(id)));
- deployed = true;
+ if (metaStorageNodesOnStart)
+ fut.join();
+ else {
+ // TODO: need to wait for this future in init phase https://issues.apache.org/jira/browse/IGNITE-14414
+ }
+ }
+
+ deployed = true;
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -247,7 +310,14 @@
@Nullable ByteArray key,
@NotNull WatchListener lsnr
) {
- return waitForReDeploy(watchAggregator.add(key, lsnr));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return waitForReDeploy(watchAggregator.add(key, lsnr));
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -262,7 +332,15 @@
@Nullable ByteArray key,
@NotNull WatchListener lsnr
) {
- return waitForReDeploy(watchAggregator.addPrefix(key, lsnr));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return waitForReDeploy(watchAggregator.addPrefix(key, lsnr));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -277,7 +355,15 @@
@NotNull Collection<ByteArray> keys,
@NotNull WatchListener lsnr
) {
- return waitForReDeploy(watchAggregator.add(keys, lsnr));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return waitForReDeploy(watchAggregator.add(keys, lsnr));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -293,7 +379,15 @@
@NotNull ByteArray to,
@NotNull WatchListener lsnr
) {
- return waitForReDeploy(watchAggregator.add(from, to, lsnr));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return waitForReDeploy(watchAggregator.add(from, to, lsnr));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -303,95 +397,199 @@
* @return future, which will be completed when unregister finished.
*/
public synchronized CompletableFuture<Void> unregisterWatch(long id) {
- watchAggregator.cancel(id);
- if (deployed)
- return updateWatches().thenAccept(v -> {});
- else
- return deployFut.thenAccept(uuid -> {});
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ watchAggregator.cancel(id);
+ if (deployed)
+ return updateWatches().thenAccept(v -> {});
+ else
+ return deployFut.thenAccept(uuid -> {});
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
* @see MetaStorageService#get(ByteArray)
*/
public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key) {
- return metaStorageSvcFut.thenCompose(svc -> svc.get(key));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return metaStorageSvcFut.thenCompose(svc -> svc.get(key));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
* @see MetaStorageService#get(ByteArray, long)
*/
public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key, long revUpperBound) {
- return metaStorageSvcFut.thenCompose(svc -> svc.get(key, revUpperBound));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return metaStorageSvcFut.thenCompose(svc -> svc.get(key, revUpperBound));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
* @see MetaStorageService#getAll(Set)
*/
public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys) {
- return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
* @see MetaStorageService#getAll(Set, long)
*/
public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys, long revUpperBound) {
- return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys, revUpperBound));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys, revUpperBound));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
* @see MetaStorageService#put(ByteArray, byte[])
*/
public @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, byte[] val) {
- return metaStorageSvcFut.thenCompose(svc -> svc.put(key, val));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return metaStorageSvcFut.thenCompose(svc -> svc.put(key, val));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
* @see MetaStorageService#getAndPut(ByteArray, byte[])
*/
public @NotNull CompletableFuture<Entry> getAndPut(@NotNull ByteArray key, byte[] val) {
- return metaStorageSvcFut.thenCompose(svc -> svc.getAndPut(key, val));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return metaStorageSvcFut.thenCompose(svc -> svc.getAndPut(key, val));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
* @see MetaStorageService#putAll(Map)
*/
public @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals) {
- return metaStorageSvcFut.thenCompose(svc -> svc.putAll(vals));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return metaStorageSvcFut.thenCompose(svc -> svc.putAll(vals));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
* @see MetaStorageService#getAndPutAll(Map)
*/
public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndPutAll(@NotNull Map<ByteArray, byte[]> vals) {
- return metaStorageSvcFut.thenCompose(svc -> svc.getAndPutAll(vals));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return metaStorageSvcFut.thenCompose(svc -> svc.getAndPutAll(vals));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
* @see MetaStorageService#remove(ByteArray)
*/
public @NotNull CompletableFuture<Void> remove(@NotNull ByteArray key) {
- return metaStorageSvcFut.thenCompose(svc -> svc.remove(key));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return metaStorageSvcFut.thenCompose(svc -> svc.remove(key));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
* @see MetaStorageService#getAndRemove(ByteArray)
*/
public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull ByteArray key) {
- return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemove(key));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemove(key));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
* @see MetaStorageService#removeAll(Set)
*/
public @NotNull CompletableFuture<Void> removeAll(@NotNull Set<ByteArray> keys) {
- return metaStorageSvcFut.thenCompose(svc -> svc.removeAll(keys));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return metaStorageSvcFut.thenCompose(svc -> svc.removeAll(keys));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
* @see MetaStorageService#getAndRemoveAll(Set)
*/
public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(@NotNull Set<ByteArray> keys) {
- return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemoveAll(keys));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemoveAll(keys));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -404,7 +602,15 @@
@NotNull Operation success,
@NotNull Operation failure
) {
- return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -415,17 +621,32 @@
@NotNull Collection<Operation> success,
@NotNull Collection<Operation> failure
) {
- return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure));
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure));
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
* @see MetaStorageService#range(ByteArray, ByteArray, long)
*/
- public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound) {
- return new CursorWrapper<>(
- metaStorageSvcFut,
- metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, revUpperBound))
- );
+ public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound) throws NodeStoppingException {
+ if (!busyLock.enterBusy())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+ try {
+ return new CursorWrapper<>(
+ metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, revUpperBound))
+ );
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -441,21 +662,35 @@
* @see ByteArray
* @see Entry
*/
- public @NotNull Cursor<Entry> rangeWithAppliedRevision(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) {
- return new CursorWrapper<>(
- metaStorageSvcFut,
- metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, appliedRevision()))
- );
+ public @NotNull Cursor<Entry> rangeWithAppliedRevision(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) throws NodeStoppingException {
+ if (!busyLock.enterBusy())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+ try {
+ return new CursorWrapper<>(
+ metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, appliedRevision()))
+ );
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
* @see MetaStorageService#range(ByteArray, ByteArray)
*/
- public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) {
- return new CursorWrapper<>(
- metaStorageSvcFut,
- metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo))
- );
+ public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) throws NodeStoppingException {
+ if (!busyLock.enterBusy())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+ try {
+ return new CursorWrapper<>(
+ metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo))
+ );
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -472,13 +707,20 @@
* @see ByteArray
* @see Entry
*/
- public @NotNull Cursor<Entry> prefixWithAppliedRevision(@NotNull ByteArray keyPrefix) {
- var rangeCriterion = KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix);
+ public @NotNull Cursor<Entry> prefixWithAppliedRevision(@NotNull ByteArray keyPrefix) throws NodeStoppingException {
+ if (!busyLock.enterBusy())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
- return new CursorWrapper<>(
- metaStorageSvcFut,
- metaStorageSvcFut.thenApply(svc -> svc.range(rangeCriterion.from(), rangeCriterion.to(), appliedRevision()))
- );
+ try {
+ var rangeCriterion = KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix);
+
+ return new CursorWrapper<>(
+ metaStorageSvcFut.thenApply(svc -> svc.range(rangeCriterion.from(), rangeCriterion.to(), appliedRevision()))
+ );
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -492,7 +734,7 @@
* @see ByteArray
* @see Entry
*/
- public @NotNull Cursor<Entry> prefix(@NotNull ByteArray keyPrefix) {
+ public @NotNull Cursor<Entry> prefix(@NotNull ByteArray keyPrefix) throws NodeStoppingException {
return prefix(keyPrefix, -1);
}
@@ -510,19 +752,34 @@
* @see ByteArray
* @see Entry
*/
- public @NotNull Cursor<Entry> prefix(@NotNull ByteArray keyPrefix, long revUpperBound) {
- var rangeCriterion = KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix);
- return new CursorWrapper<>(
- metaStorageSvcFut,
- metaStorageSvcFut.thenApply(svc -> svc.range(rangeCriterion.from(), rangeCriterion.to(), revUpperBound))
- );
+ public @NotNull Cursor<Entry> prefix(@NotNull ByteArray keyPrefix, long revUpperBound) throws NodeStoppingException {
+ if (!busyLock.enterBusy())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+ try {
+ var rangeCriterion = KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix);
+ return new CursorWrapper<>(
+ metaStorageSvcFut.thenApply(svc -> svc.range(rangeCriterion.from(), rangeCriterion.to(), revUpperBound))
+ );
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
* @see MetaStorageService#compact()
*/
public @NotNull CompletableFuture<Void> compact() {
- return metaStorageSvcFut.thenCompose(MetaStorageService::compact);
+ if (!busyLock.enterBusy())
+ return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+ try {
+ return metaStorageSvcFut.thenCompose(MetaStorageService::compact);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -635,9 +892,6 @@
// TODO: IGNITE-14691 Temporally solution that should be removed after implementing reactive watches.
/** Cursor wrapper. */
private final class CursorWrapper<T> implements Cursor<T> {
- /** Meta storage service future. */
- private final CompletableFuture<MetaStorageService> metaStorageSvcFut;
-
/** Inner cursor future. */
private final CompletableFuture<Cursor<T>> innerCursorFut;
@@ -647,30 +901,35 @@
private final InnerIterator it = new InnerIterator();
/**
- * @param metaStorageSvcFut Meta storage service future.
* @param innerCursorFut Inner cursor future.
*/
CursorWrapper(
- CompletableFuture<MetaStorageService> metaStorageSvcFut,
CompletableFuture<Cursor<T>> innerCursorFut
) {
- this.metaStorageSvcFut = metaStorageSvcFut;
this.innerCursorFut = innerCursorFut;
this.innerIterFut = innerCursorFut.thenApply(Iterable::iterator);
}
/** {@inheritDoc} */
@Override public void close() throws Exception {
- innerCursorFut.thenApply(cursor -> {
- try {
- cursor.close();
+ if (!busyLock.enterBusy())
+ throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
- return null;
- }
- catch (Exception e) {
- throw new IgniteInternalException(e);
- }
- }).get();
+ try {
+ innerCursorFut.thenApply(cursor -> {
+ try {
+ cursor.close();
+
+ return null;
+ }
+ catch (Exception e) {
+ throw new IgniteInternalException(e);
+ }
+ }).get();
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
/** {@inheritDoc} */
@@ -689,22 +948,39 @@
}
private class InnerIterator implements Iterator<T> {
+ /** {@inheritDoc} */
@Override public boolean hasNext() {
+ if (!busyLock.enterBusy())
+ return false;
+
try {
- return innerIterFut.thenApply(Iterator::hasNext).get();
+ try {
+ return innerIterFut.thenApply(Iterator::hasNext).get();
+ }
+ catch (InterruptedException | ExecutionException e) {
+ throw new IgniteInternalException(e);
+ }
}
- catch (InterruptedException | ExecutionException e) {
- throw new IgniteInternalException(e);
+ finally {
+ busyLock.leaveBusy();
}
}
/** {@inheritDoc} */
@Override public T next() {
+ if (!busyLock.enterBusy())
+ throw new NoSuchElementException("No such element because node is stopping.");
+
try {
- return innerIterFut.thenApply(Iterator::next).get();
+ try {
+ return innerIterFut.thenApply(Iterator::next).get();
+ }
+ catch (InterruptedException | ExecutionException e) {
+ throw new IgniteInternalException(e);
+ }
}
- catch (InterruptedException | ExecutionException e) {
- throw new IgniteInternalException(e);
+ finally {
+ busyLock.leaveBusy();
}
}
}
@@ -745,4 +1021,24 @@
else
throw new UnsupportedOperationException("Unsupported type of criterion");
}
+
+ /**
+ * Return metastorage nodes.
+ *
+ * This code will be deleted after node init phase is developed.
+ * https://issues.apache.org/jira/browse/IGNITE-14414
+ */
+ private List<ClusterNode> metastorageNodes() {
+ String[] metastorageNodes = this.locCfgMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
+ .metastorageNodes().value();
+
+ Predicate<ClusterNode> metaStorageNodesContainsLocPred =
+ clusterNode -> Arrays.asList(metastorageNodes).contains(clusterNode.name());
+
+ List<ClusterNode> metaStorageMembers = clusterNetSvc.topologyService().allMembers().stream()
+ .filter(metaStorageNodesContainsLocPred)
+ .collect(Collectors.toList());
+
+ return metaStorageMembers;
+ }
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index ba63c0c..181e7ca 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -118,7 +118,7 @@
String locNodeName = clusterNetSvc.topologyService().localMember().name();
- if (nodes.stream().map(ClusterNode::name).collect(Collectors.toSet()).contains(locNodeName))
+ if (nodes.stream().anyMatch(n -> locNodeName.equals(n.name())))
raftServer.stopRaftGroup(groupId);
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index fa5994c..0a3dcda 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -28,6 +28,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterService;
@@ -96,8 +97,8 @@
/** */
private final long retryDelay;
- /** */
- private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+ /** TODO: Use shared executors instead https://issues.apache.org/jira/browse/IGNITE-15136 */
+ private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Raft-Group-Service-Pool"));
/**
* Constructor.
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index f36e04a..d824a1c 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -49,6 +49,7 @@
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.schema.ColumnType;
@@ -364,7 +365,7 @@
workDir
);
- TableImpl tbl2;
+ TableImpl tbl2 = null;
try {
tableManager.start();
@@ -393,24 +394,31 @@
tableCreatedFlag.set(createTbl);
- when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invocation -> {
- AtomicBoolean firstRecord = new AtomicBoolean(createTbl);
+ try {
+ when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invocation -> {
+ AtomicBoolean firstRecord = new AtomicBoolean(createTbl);
- Cursor<Entry> cursor = mock(Cursor.class);
+ Cursor<Entry> cursor = mock(Cursor.class);
- when(cursor.hasNext()).thenAnswer(hasNextInvocation ->
- firstRecord.compareAndSet(true, false));
+ when(cursor.hasNext()).thenAnswer(hasNextInvocation ->
+ firstRecord.compareAndSet(true, false));
- Entry mockEntry = mock(Entry.class);
+ Entry mockEntry = mock(Entry.class);
- when(mockEntry.key()).thenReturn(new ByteArray(PUBLIC_PREFIX + "uuid." + NamedListNode.NAME));
+ when(mockEntry.key()).thenReturn(new ByteArray(PUBLIC_PREFIX + "uuid." + NamedListNode.NAME));
- when(mockEntry.value()).thenReturn(ByteUtils.toBytes(schemaTable.canonicalName()));
+ when(mockEntry.value()).thenReturn(ByteUtils.toBytes(schemaTable.canonicalName()));
- when(cursor.next()).thenReturn(mockEntry);
+ when(cursor.next()).thenReturn(mockEntry);
- return cursor;
- });
+ return cursor;
+ });
+ }
+ catch (NodeStoppingException e) {
+ LOG.error("Node was stopped during table creation.", e);
+
+ fail();
+ }
if (phaser != null)
phaser.arriveAndAwaitAdvance();
@@ -427,6 +435,11 @@
assertEquals(tablesBeforeCreation + 1, tableManager.tables().size());
}
+ catch (NodeStoppingException e) {
+ LOG.error("Node was stopped during table creation.", e);
+
+ fail();
+ }
finally {
tableManager.stop();
}