CURATOR-688. Fix SharedCount/SharedValue not update after Stat.version overflowed (#478)
Co-authored-by: Kezhu Wang <kezhuw@apache.org>
Co-authored-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java
new file mode 100644
index 0000000..58e8339
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.shared;
+
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Exception to alert overflowed {@link Stat#getVersion()} {@code -1} which is not suitable in
+ * {@link SharedValue#trySetValue(VersionedValue, byte[])} and {@link SharedCount#trySetCount(VersionedValue, int)}.
+ *
+ * <p>In case of this exception, clients have to choose:
+ * <ul>
+ * <li>Take their own risk to do a blind set.</li>
+ * <li>Update ZooKeeper cluster to solve <a href="https://issues.apache.org/jira/browse/ZOOKEEPER-4743">ZOOKEEPER-4743</a>.</li>
+ * </ul>
+ */
+public class IllegalTrySetVersionException extends IllegalArgumentException {
+ @Override
+ public String getMessage() {
+ return "overflowed Stat.version -1 is not suitable for trySet(a.k.a. compare-and-set ZooKeeper::setData)";
+ }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
index c401a3b..2ecc528 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
@@ -30,6 +30,7 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.state.ConnectionState;
+import org.apache.zookeeper.data.Stat;
/**
* Manages a shared integer. All clients watching the same path will have the up-to-date
@@ -60,7 +61,7 @@
@Override
public VersionedValue<Integer> getVersionedValue() {
VersionedValue<byte[]> localValue = sharedValue.getVersionedValue();
- return new VersionedValue<Integer>(localValue.getVersion(), fromBytes(localValue.getValue()));
+ return localValue.mapValue(SharedCount::fromBytes);
}
/**
@@ -102,11 +103,11 @@
* @param newCount the new value to attempt
* @return true if the change attempt was successful, false if not. If the change
* was not successful, {@link #getCount()} will return the updated value
+ * @throws IllegalTrySetVersionException if {@link Stat#getVersion()} overflowed to {@code -1}
* @throws Exception ZK errors, interruptions, etc.
*/
public boolean trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception {
- VersionedValue<byte[]> previousCopy =
- new VersionedValue<byte[]>(previous.getVersion(), toBytes(previous.getValue()));
+ VersionedValue<byte[]> previousCopy = previous.mapValue(SharedCount::toBytes);
return sharedValue.trySetValue(previousCopy, toBytes(newCount));
}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 50e50ed..ddb96fc 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -48,6 +48,7 @@
* value (considering ZK's normal consistency guarantees).
*/
public class SharedValue implements Closeable, SharedValueReader {
+ private static final int NO_ZXID = -1;
private static final int UNINITIALIZED_VERSION = -1;
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -101,8 +102,8 @@
this.path = PathUtils.validatePath(path);
this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
this.watcher = new SharedValueCuratorWatcher();
- currentValue = new AtomicReference<VersionedValue<byte[]>>(
- new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
+ currentValue = new AtomicReference<>(
+ new VersionedValue<>(NO_ZXID, UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
}
@VisibleForTesting
@@ -112,8 +113,8 @@
this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
// inject watcher for testing
this.watcher = watcher;
- currentValue = new AtomicReference<VersionedValue<byte[]>>(
- new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
+ currentValue = new AtomicReference<>(
+ new VersionedValue<>(NO_ZXID, UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
}
@Override
@@ -125,12 +126,11 @@
@Override
public VersionedValue<byte[]> getVersionedValue() {
VersionedValue<byte[]> localCopy = currentValue.get();
- return new VersionedValue<byte[]>(
- localCopy.getVersion(), Arrays.copyOf(localCopy.getValue(), localCopy.getValue().length));
+ return localCopy.mapValue(bytes -> Arrays.copyOf(bytes, bytes.length));
}
/**
- * Change the shared value value irrespective of its previous state
+ * Change the shared value irrespective of its previous state
*
* @param newValue new value
* @throws Exception ZK errors, interruptions, etc.
@@ -139,7 +139,7 @@
Preconditions.checkState(state.get() == State.STARTED, "not started");
Stat result = client.setData().forPath(path, newValue);
- updateValue(result.getVersion(), Arrays.copyOf(newValue, newValue.length));
+ updateValue(result.getMzxid(), result.getVersion(), Arrays.copyOf(newValue, newValue.length));
}
/**
@@ -171,19 +171,25 @@
* @param newValue the new value to attempt
* @return true if the change attempt was successful, false if not. If the change
* was not successful, {@link #getValue()} will return the updated value
+ * @throws IllegalTrySetVersionException if {@link Stat#getVersion()} overflowed to {@code -1}
* @throws Exception ZK errors, interruptions, etc.
*/
public boolean trySetValue(VersionedValue<byte[]> previous, byte[] newValue) throws Exception {
Preconditions.checkState(state.get() == State.STARTED, "not started");
VersionedValue<byte[]> current = currentValue.get();
- if (previous.getVersion() != current.getVersion() || !Arrays.equals(previous.getValue(), current.getValue())) {
+ // Omit comparing of getVersion here, so we can test the exception case.
+ // This affects no correctness as construction of VersionedValue is private.
+ if (previous.getZxid() != current.getZxid() || !Arrays.equals(previous.getValue(), current.getValue())) {
return false;
}
+ if (previous.getVersion() == -1) {
+ throw new IllegalTrySetVersionException();
+ }
try {
Stat result = client.setData().withVersion(previous.getVersion()).forPath(path, newValue);
- updateValue(result.getVersion(), Arrays.copyOf(newValue, newValue.length));
+ updateValue(result.getMzxid(), result.getVersion(), Arrays.copyOf(newValue, newValue.length));
return true;
} catch (KeeperException.BadVersionException ignore) {
// ignore
@@ -193,14 +199,13 @@
return false;
}
- private void updateValue(int version, byte[] bytes) {
+ private void updateValue(long zxid, int version, byte[] bytes) {
while (true) {
VersionedValue<byte[]> current = currentValue.get();
- if (current.getVersion() >= version) {
- // A newer version was concurrently set.
+ if (current.getZxid() >= zxid) {
return;
}
- if (currentValue.compareAndSet(current, new VersionedValue<byte[]>(version, bytes))) {
+ if (currentValue.compareAndSet(current, new VersionedValue<>(zxid, version, bytes))) {
// Successfully set.
return;
}
@@ -248,14 +253,14 @@
Stat localStat = new Stat();
byte[] bytes =
client.getData().storingStatIn(localStat).usingWatcher(watcher).forPath(path);
- updateValue(localStat.getVersion(), bytes);
+ updateValue(localStat.getMzxid(), localStat.getVersion(), bytes);
}
private final BackgroundCallback upadateAndNotifyListenerCallback = new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
- updateValue(event.getStat().getVersion(), event.getData());
+ updateValue(event.getStat().getMzxid(), event.getStat().getVersion(), event.getData());
notifyListeners();
}
}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
index 64d9780..64f0976 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
@@ -20,23 +20,38 @@
package org.apache.curator.framework.recipes.shared;
import com.google.common.base.Preconditions;
+import java.util.function.Function;
+import org.apache.zookeeper.data.Stat;
/**
- * POJO for a version and a value
+ * POJO for versioned value.
+ *
+ * <p>Client must never construct this but get through {@link SharedValue#getVersionedValue()}
+ * or {@link SharedCount#getVersionedValue()}.
*/
public class VersionedValue<T> {
+ private final long zxid;
private final int version;
private final T value;
- /**
- * @param version the version
- * @param value the value (cannot be null)
- */
- VersionedValue(int version, T value) {
+ VersionedValue(long zxid, int version, T value) {
+ this.zxid = zxid;
this.version = version;
this.value = Preconditions.checkNotNull(value, "value cannot be null");
}
+ /**
+ * It is {@link Stat#getMzxid()} of the corresponding node.
+ */
+ public long getZxid() {
+ return zxid;
+ }
+
+ /**
+ * It is {@link Stat#getVersion()} of the corresponding node.
+ *
+ * <p>It is known that this will overflow and hence not monotonic.
+ */
public int getVersion() {
return version;
}
@@ -44,4 +59,8 @@
public T getValue() {
return value;
}
+
+ <R> VersionedValue<R> mapValue(Function<T, R> f) {
+ return new VersionedValue<>(zxid, version, f.apply(value));
+ }
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index b7b56c0..6e84388 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -21,6 +21,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -51,6 +52,7 @@
import org.apache.zookeeper.WatchedEvent;
import org.junit.jupiter.api.Test;
+@SuppressWarnings("deprecation")
public class TestSharedCount extends CuratorTestBase {
@Test
public void testMultiClients() throws Exception {
@@ -206,13 +208,20 @@
assertEquals(count.getCount(), 10);
// Wrong value
- assertFalse(count.trySetCount(new VersionedValue<Integer>(3, 20), 7));
+ assertFalse(count.trySetCount(new VersionedValue<>(current.getZxid(), 3, 20), 7));
// Wrong version
- assertFalse(count.trySetCount(new VersionedValue<Integer>(10, 10), 7));
+ assertFalse(count.trySetCount(new VersionedValue<>(current.getZxid(), 10, 10), 7));
+ assertFalse(count.trySetCount(new VersionedValue<>(current.getZxid() + 1, 3, 10), 7));
// Server changed
client.setData().forPath("/count", SharedCount.toBytes(88));
assertFalse(count.trySetCount(current, 234));
+
+ assertThrows(IllegalTrySetVersionException.class, () -> {
+ VersionedValue<Integer> cached = count.getVersionedValue();
+ VersionedValue<Integer> illegal = new VersionedValue<>(cached.getZxid(), -1, cached.getValue());
+ count.trySetCount(illegal, 20);
+ });
} finally {
CloseableUtils.closeQuietly(count);
CloseableUtils.closeQuietly(client);