CURATOR-688. Fix SharedCount/SharedValue not update after Stat.version overflowed (#478)

Co-authored-by: Kezhu Wang <kezhuw@apache.org>
Co-authored-by: tison <wander4096@gmail.com>
Signed-off-by: tison <wander4096@gmail.com>
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java
new file mode 100644
index 0000000..58e8339
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/IllegalTrySetVersionException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.framework.recipes.shared;
+
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Exception to alert overflowed {@link Stat#getVersion()} {@code -1} which is not suitable in
+ * {@link SharedValue#trySetValue(VersionedValue, byte[])} and {@link SharedCount#trySetCount(VersionedValue, int)}.
+ *
+ * <p>In case of this exception, clients have to choose:
+ * <ul>
+ *     <li>Take their own risk to do a blind set.</li>
+ *     <li>Update ZooKeeper cluster to solve <a href="https://issues.apache.org/jira/browse/ZOOKEEPER-4743">ZOOKEEPER-4743</a>.</li>
+ * </ul>
+ */
+public class IllegalTrySetVersionException extends IllegalArgumentException {
+    @Override
+    public String getMessage() {
+        return "overflowed Stat.version -1 is not suitable for trySet(a.k.a. compare-and-set ZooKeeper::setData)";
+    }
+}
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
index c401a3b..2ecc528 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedCount.java
@@ -30,6 +30,7 @@
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.listen.Listenable;
 import org.apache.curator.framework.state.ConnectionState;
+import org.apache.zookeeper.data.Stat;
 
 /**
  * Manages a shared integer. All clients watching the same path will have the up-to-date
@@ -60,7 +61,7 @@
     @Override
     public VersionedValue<Integer> getVersionedValue() {
         VersionedValue<byte[]> localValue = sharedValue.getVersionedValue();
-        return new VersionedValue<Integer>(localValue.getVersion(), fromBytes(localValue.getValue()));
+        return localValue.mapValue(SharedCount::fromBytes);
     }
 
     /**
@@ -102,11 +103,11 @@
      * @param newCount the new value to attempt
      * @return true if the change attempt was successful, false if not. If the change
      * was not successful, {@link #getCount()} will return the updated value
+     * @throws IllegalTrySetVersionException if {@link Stat#getVersion()} overflowed to {@code -1}
      * @throws Exception ZK errors, interruptions, etc.
      */
     public boolean trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception {
-        VersionedValue<byte[]> previousCopy =
-                new VersionedValue<byte[]>(previous.getVersion(), toBytes(previous.getValue()));
+        VersionedValue<byte[]> previousCopy = previous.mapValue(SharedCount::toBytes);
         return sharedValue.trySetValue(previousCopy, toBytes(newCount));
     }
 
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
index 50e50ed..ddb96fc 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/SharedValue.java
@@ -48,6 +48,7 @@
  * value (considering ZK's normal consistency guarantees).
  */
 public class SharedValue implements Closeable, SharedValueReader {
+    private static final int NO_ZXID = -1;
     private static final int UNINITIALIZED_VERSION = -1;
 
     private final Logger log = LoggerFactory.getLogger(getClass());
@@ -101,8 +102,8 @@
         this.path = PathUtils.validatePath(path);
         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
         this.watcher = new SharedValueCuratorWatcher();
-        currentValue = new AtomicReference<VersionedValue<byte[]>>(
-                new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
+        currentValue = new AtomicReference<>(
+                new VersionedValue<>(NO_ZXID, UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
     }
 
     @VisibleForTesting
@@ -112,8 +113,8 @@
         this.seedValue = Arrays.copyOf(seedValue, seedValue.length);
         // inject watcher for testing
         this.watcher = watcher;
-        currentValue = new AtomicReference<VersionedValue<byte[]>>(
-                new VersionedValue<byte[]>(UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
+        currentValue = new AtomicReference<>(
+                new VersionedValue<>(NO_ZXID, UNINITIALIZED_VERSION, Arrays.copyOf(seedValue, seedValue.length)));
     }
 
     @Override
@@ -125,12 +126,11 @@
     @Override
     public VersionedValue<byte[]> getVersionedValue() {
         VersionedValue<byte[]> localCopy = currentValue.get();
-        return new VersionedValue<byte[]>(
-                localCopy.getVersion(), Arrays.copyOf(localCopy.getValue(), localCopy.getValue().length));
+        return localCopy.mapValue(bytes -> Arrays.copyOf(bytes, bytes.length));
     }
 
     /**
-     * Change the shared value value irrespective of its previous state
+     * Change the shared value irrespective of its previous state
      *
      * @param newValue new value
      * @throws Exception ZK errors, interruptions, etc.
@@ -139,7 +139,7 @@
         Preconditions.checkState(state.get() == State.STARTED, "not started");
 
         Stat result = client.setData().forPath(path, newValue);
-        updateValue(result.getVersion(), Arrays.copyOf(newValue, newValue.length));
+        updateValue(result.getMzxid(), result.getVersion(), Arrays.copyOf(newValue, newValue.length));
     }
 
     /**
@@ -171,19 +171,25 @@
      * @param newValue the new value to attempt
      * @return true if the change attempt was successful, false if not. If the change
      * was not successful, {@link #getValue()} will return the updated value
+     * @throws IllegalTrySetVersionException if {@link Stat#getVersion()} overflowed to {@code -1}
      * @throws Exception ZK errors, interruptions, etc.
      */
     public boolean trySetValue(VersionedValue<byte[]> previous, byte[] newValue) throws Exception {
         Preconditions.checkState(state.get() == State.STARTED, "not started");
 
         VersionedValue<byte[]> current = currentValue.get();
-        if (previous.getVersion() != current.getVersion() || !Arrays.equals(previous.getValue(), current.getValue())) {
+        // Omit comparing of getVersion here, so we can test the exception case.
+        // This affects no correctness as construction of VersionedValue is private.
+        if (previous.getZxid() != current.getZxid() || !Arrays.equals(previous.getValue(), current.getValue())) {
             return false;
         }
+        if (previous.getVersion() == -1) {
+            throw new IllegalTrySetVersionException();
+        }
 
         try {
             Stat result = client.setData().withVersion(previous.getVersion()).forPath(path, newValue);
-            updateValue(result.getVersion(), Arrays.copyOf(newValue, newValue.length));
+            updateValue(result.getMzxid(), result.getVersion(), Arrays.copyOf(newValue, newValue.length));
             return true;
         } catch (KeeperException.BadVersionException ignore) {
             // ignore
@@ -193,14 +199,13 @@
         return false;
     }
 
-    private void updateValue(int version, byte[] bytes) {
+    private void updateValue(long zxid, int version, byte[] bytes) {
         while (true) {
             VersionedValue<byte[]> current = currentValue.get();
-            if (current.getVersion() >= version) {
-                // A newer version was concurrently set.
+            if (current.getZxid() >= zxid) {
                 return;
             }
-            if (currentValue.compareAndSet(current, new VersionedValue<byte[]>(version, bytes))) {
+            if (currentValue.compareAndSet(current, new VersionedValue<>(zxid, version, bytes))) {
                 // Successfully set.
                 return;
             }
@@ -248,14 +253,14 @@
         Stat localStat = new Stat();
         byte[] bytes =
                 client.getData().storingStatIn(localStat).usingWatcher(watcher).forPath(path);
-        updateValue(localStat.getVersion(), bytes);
+        updateValue(localStat.getMzxid(), localStat.getVersion(), bytes);
     }
 
     private final BackgroundCallback upadateAndNotifyListenerCallback = new BackgroundCallback() {
         @Override
         public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
             if (event.getResultCode() == KeeperException.Code.OK.intValue()) {
-                updateValue(event.getStat().getVersion(), event.getData());
+                updateValue(event.getStat().getMzxid(), event.getStat().getVersion(), event.getData());
                 notifyListeners();
             }
         }
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
index 64d9780..64f0976 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/shared/VersionedValue.java
@@ -20,23 +20,38 @@
 package org.apache.curator.framework.recipes.shared;
 
 import com.google.common.base.Preconditions;
+import java.util.function.Function;
+import org.apache.zookeeper.data.Stat;
 
 /**
- * POJO for a version and a value
+ * POJO for versioned value.
+ *
+ * <p>Client must never construct this but get through {@link SharedValue#getVersionedValue()}
+ * or {@link SharedCount#getVersionedValue()}.
  */
 public class VersionedValue<T> {
+    private final long zxid;
     private final int version;
     private final T value;
 
-    /**
-     * @param version the version
-     * @param value the value (cannot be null)
-     */
-    VersionedValue(int version, T value) {
+    VersionedValue(long zxid, int version, T value) {
+        this.zxid = zxid;
         this.version = version;
         this.value = Preconditions.checkNotNull(value, "value cannot be null");
     }
 
+    /**
+     * It is {@link Stat#getMzxid()} of the corresponding node.
+     */
+    public long getZxid() {
+        return zxid;
+    }
+
+    /**
+     * It is {@link Stat#getVersion()} of the corresponding node.
+     *
+     * <p>It is known that this will overflow and hence not monotonic.
+     */
     public int getVersion() {
         return version;
     }
@@ -44,4 +59,8 @@
     public T getValue() {
         return value;
     }
+
+    <R> VersionedValue<R> mapValue(Function<T, R> f) {
+        return new VersionedValue<>(zxid, version, f.apply(value));
+    }
 }
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
index b7b56c0..6e84388 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/shared/TestSharedCount.java
@@ -21,6 +21,7 @@
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -51,6 +52,7 @@
 import org.apache.zookeeper.WatchedEvent;
 import org.junit.jupiter.api.Test;
 
+@SuppressWarnings("deprecation")
 public class TestSharedCount extends CuratorTestBase {
     @Test
     public void testMultiClients() throws Exception {
@@ -206,13 +208,20 @@
             assertEquals(count.getCount(), 10);
 
             // Wrong value
-            assertFalse(count.trySetCount(new VersionedValue<Integer>(3, 20), 7));
+            assertFalse(count.trySetCount(new VersionedValue<>(current.getZxid(), 3, 20), 7));
             // Wrong version
-            assertFalse(count.trySetCount(new VersionedValue<Integer>(10, 10), 7));
+            assertFalse(count.trySetCount(new VersionedValue<>(current.getZxid(), 10, 10), 7));
+            assertFalse(count.trySetCount(new VersionedValue<>(current.getZxid() + 1, 3, 10), 7));
 
             // Server changed
             client.setData().forPath("/count", SharedCount.toBytes(88));
             assertFalse(count.trySetCount(current, 234));
+
+            assertThrows(IllegalTrySetVersionException.class, () -> {
+                VersionedValue<Integer> cached = count.getVersionedValue();
+                VersionedValue<Integer> illegal = new VersionedValue<>(cached.getZxid(), -1, cached.getValue());
+                count.trySetCount(illegal, 20);
+            });
         } finally {
             CloseableUtils.closeQuietly(count);
             CloseableUtils.closeQuietly(client);