IGNITE-15417 Creates cursorId on client side. Fixes #307

Signed-off-by: Slava Koptilin <slava.koptilin@gmail.com>
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
index efdf85b..482489a 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.internal.metastorage.common.OperationType;
@@ -51,6 +52,7 @@
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.lang.IgniteUuidGenerator;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.jetbrains.annotations.NotNull;
@@ -63,6 +65,9 @@
     /** The logger. */
     private static final IgniteLogger LOG = IgniteLogger.forClass(MetaStorageServiceImpl.class);
 
+    /** IgniteUuid generator. */
+    private static final IgniteUuidGenerator uuidGenerator = new IgniteUuidGenerator(UUID.randomUUID(), 0);
+
     /** Meta storage raft group service. */
     private final RaftGroupService metaStorageRaftGrpSvc;
 
@@ -177,7 +182,8 @@
     @Override public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound) {
         return new CursorImpl<>(
                 metaStorageRaftGrpSvc,
-                metaStorageRaftGrpSvc.run(new RangeCommand(keyFrom, keyTo, revUpperBound, localNodeId)),
+                metaStorageRaftGrpSvc.run(
+                    new RangeCommand(keyFrom, keyTo, revUpperBound, localNodeId, uuidGenerator.randomUuid())),
                 MetaStorageServiceImpl::singleEntryResult
         );
     }
@@ -186,7 +192,8 @@
     @Override public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) {
         return new CursorImpl<>(
                 metaStorageRaftGrpSvc,
-                metaStorageRaftGrpSvc.run(new RangeCommand(keyFrom, keyTo, localNodeId)),
+                metaStorageRaftGrpSvc.run(
+                    new RangeCommand(keyFrom, keyTo, localNodeId, uuidGenerator.randomUuid())),
                 MetaStorageServiceImpl::singleEntryResult
         );
     }
@@ -199,7 +206,7 @@
         @NotNull WatchListener lsnr
     ) {
         CompletableFuture<IgniteUuid> watchRes =
-            metaStorageRaftGrpSvc.run(new WatchRangeKeysCommand(keyFrom, keyTo, revision, localNodeId));
+            metaStorageRaftGrpSvc.run(new WatchRangeKeysCommand(keyFrom, keyTo, revision, localNodeId, uuidGenerator.randomUuid()));
 
         watchRes.thenAccept(
             watchId -> watchProcessor.addWatch(
@@ -228,7 +235,7 @@
         @NotNull WatchListener lsnr
     ) {
         CompletableFuture<IgniteUuid> watchRes =
-            metaStorageRaftGrpSvc.run(new WatchExactKeysCommand(keys, revision, localNodeId));
+            metaStorageRaftGrpSvc.run(new WatchExactKeysCommand(keys, revision, localNodeId, uuidGenerator.randomUuid()));
 
         watchRes.thenAccept(
             watchId -> watchProcessor.addWatch(
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java
index d0f4482..7e6d45c 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.metastorage.common.command;
 
 import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -39,17 +40,22 @@
     /** Id of the node that requests range. */
     @NotNull private final String requesterNodeId;
 
+    /** Id of cursor that is associated with the current command. */
+    @NotNull private final IgniteUuid cursorId;
+
     /**
      * @param keyFrom Start key of range (inclusive).
      * @param keyTo End key of range (exclusive).
      * @param requesterNodeId Id of the node that requests range.
+     * @param cursorId Id of cursor that is associated with the current command.
      */
     public RangeCommand(
         @NotNull ByteArray keyFrom,
         @Nullable ByteArray keyTo,
-        @NotNull String requesterNodeId
+        @NotNull String requesterNodeId,
+        @NotNull IgniteUuid cursorId
     ) {
-        this(keyFrom, keyTo, -1L, requesterNodeId);
+        this(keyFrom, keyTo, -1L, requesterNodeId, cursorId);
     }
 
     /**
@@ -57,17 +63,20 @@
      * @param keyTo End key of range (exclusive).
      * @param revUpperBound The upper bound for entry revision. {@code -1} means latest revision.
      * @param requesterNodeId Id of the node that requests range.
+     * @param cursorId Id of cursor that is associated with the current command.
      */
     public RangeCommand(
         @NotNull ByteArray keyFrom,
         @Nullable ByteArray keyTo,
         long revUpperBound,
-        @NotNull String requesterNodeId
+        @NotNull String requesterNodeId,
+        @NotNull IgniteUuid cursorId
     ) {
         this.keyFrom = keyFrom.bytes();
         this.keyTo = keyTo == null ? null : keyTo.bytes();
         this.revUpperBound = revUpperBound;
         this.requesterNodeId = requesterNodeId;
+        this.cursorId = cursorId;
     }
 
     /**
@@ -97,4 +106,11 @@
     public @NotNull String requesterNodeId() {
         return requesterNodeId;
     }
+
+    /**
+     * @return Id of cursor that is associated with the current command.
+     */
+    @NotNull public IgniteUuid getCursorId() {
+        return cursorId;
+    }
 }
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
index 4e1dec0..d45d896 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
@@ -21,6 +21,7 @@
 import java.util.List;
 import java.util.Set;
 import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 
@@ -37,16 +38,20 @@
     /** Id of the node that requests watch. */
     @NotNull private final String requesterNodeId;
 
+    /** Id of cursor that is associated with the current command. */
+    @NotNull private final IgniteUuid cursorId;
+
     /**
      * @param keys The keys collection. Couldn't be {@code null}.
      * @param revision Start revision inclusive. {@code 0} - all revisions.
      * @param requesterNodeId Id of the node that requests watch.
-     *
+     * @param cursorId Id of cursor that is associated with the current command.
      */
     public WatchExactKeysCommand(
         @NotNull Set<ByteArray> keys,
         long revision,
-        @NotNull String requesterNodeId
+        @NotNull String requesterNodeId,
+        @NotNull IgniteUuid cursorId
     ) {
         this.keys = new ArrayList<>(keys.size());
 
@@ -56,6 +61,8 @@
         this.revision = revision;
 
         this.requesterNodeId = requesterNodeId;
+
+        this.cursorId = cursorId;
     }
 
     /**
@@ -78,4 +85,11 @@
     public @NotNull String requesterNodeId() {
         return requesterNodeId;
     }
+
+    /**
+     * @return Id of cursor that is associated with the current command.
+     */
+    @NotNull public IgniteUuid getCursorId() {
+        return cursorId;
+    }
 }
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java
index 21bbbd6..f4d40d1 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.metastorage.common.command;
 
 import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.raft.client.WriteCommand;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -38,17 +39,22 @@
     /** Id of the node that requests watch. */
     @NotNull private final String requesterNodeId;
 
+    /** Id of cursor that is associated with the current command. */
+    @NotNull private final IgniteUuid cursorId;
+
     /**
      * @param keyFrom Start key of range (inclusive).
      * @param keyTo End key of range (exclusive).
      * @param requesterNodeId Id of the node that requests watch.
+     * @param cursorId Id of cursor that is associated with the current command.*
      */
     public WatchRangeKeysCommand(
         @Nullable ByteArray keyFrom,
         @Nullable ByteArray keyTo,
-        @NotNull String requesterNodeId
+        @NotNull String requesterNodeId,
+        @NotNull IgniteUuid cursorId
     ) {
-        this(keyFrom, keyTo, 0L, requesterNodeId);
+        this(keyFrom, keyTo, 0L, requesterNodeId, cursorId);
     }
 
     /**
@@ -56,17 +62,20 @@
      * @param keyTo End key of range (exclusive).
      * @param revision Start revision inclusive. {@code 0} - all revisions.
      * @param requesterNodeId Id of the node that requests watch.
+     * @param cursorId Id of cursor that is associated with the current command.
      */
     public WatchRangeKeysCommand(
         @Nullable ByteArray keyFrom,
         @Nullable ByteArray keyTo,
         long revision,
-        @NotNull String requesterNodeId
+        @NotNull String requesterNodeId,
+        @NotNull IgniteUuid cursorId
     ) {
         this.keyFrom = keyFrom == null ? null : keyFrom.bytes();
         this.keyTo = keyTo == null ? null : keyTo.bytes();
         this.revision = revision;
         this.requesterNodeId = requesterNodeId;
+        this.cursorId = cursorId;
     }
 
     /**
@@ -96,4 +105,11 @@
     public @NotNull String requesterNodeId() {
         return requesterNodeId;
     }
+
+    /**
+     * @return Id of cursor that is associated with the current command.
+     */
+    public IgniteUuid getCursorId() {
+        return cursorId;
+    }
 }
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index 952b142..98652bb 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -24,7 +24,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 import org.apache.ignite.internal.metastorage.common.ConditionType;
@@ -223,7 +222,7 @@
             else if (clo.command() instanceof RangeCommand) {
                 RangeCommand rangeCmd = (RangeCommand) clo.command();
 
-                IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
+                IgniteUuid cursorId = rangeCmd.getCursorId();
 
                 Cursor<Entry> cursor = (rangeCmd.revUpperBound() != -1) ?
                     storage.range(
@@ -302,7 +301,7 @@
             else if (clo.command() instanceof WatchRangeKeysCommand) {
                 WatchRangeKeysCommand watchCmd = (WatchRangeKeysCommand) clo.command();
 
-                IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
+                IgniteUuid cursorId = watchCmd.getCursorId();
 
                 Cursor<WatchEvent> cursor =
                     storage.watch(watchCmd.keyFrom(), watchCmd.keyTo(), watchCmd.revision());
@@ -321,7 +320,7 @@
             else if (clo.command() instanceof WatchExactKeysCommand) {
                 WatchExactKeysCommand watchCmd = (WatchExactKeysCommand) clo.command();
 
-                IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
+                IgniteUuid cursorId = watchCmd.getCursorId();
 
                 Cursor<WatchEvent> cursor = storage.watch(watchCmd.keys(), watchCmd.revision());