IGNITE-15417 Creates cursorId on client side. Fixes #307
Signed-off-by: Slava Koptilin <slava.koptilin@gmail.com>
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
index efdf85b..482489a 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.internal.metastorage.common.OperationType;
@@ -51,6 +52,7 @@
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.lang.IgniteUuidGenerator;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.jetbrains.annotations.NotNull;
@@ -63,6 +65,9 @@
/** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(MetaStorageServiceImpl.class);
+ /** IgniteUuid generator. */
+ private static final IgniteUuidGenerator uuidGenerator = new IgniteUuidGenerator(UUID.randomUUID(), 0);
+
/** Meta storage raft group service. */
private final RaftGroupService metaStorageRaftGrpSvc;
@@ -177,7 +182,8 @@
@Override public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound) {
return new CursorImpl<>(
metaStorageRaftGrpSvc,
- metaStorageRaftGrpSvc.run(new RangeCommand(keyFrom, keyTo, revUpperBound, localNodeId)),
+ metaStorageRaftGrpSvc.run(
+ new RangeCommand(keyFrom, keyTo, revUpperBound, localNodeId, uuidGenerator.randomUuid())),
MetaStorageServiceImpl::singleEntryResult
);
}
@@ -186,7 +192,8 @@
@Override public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) {
return new CursorImpl<>(
metaStorageRaftGrpSvc,
- metaStorageRaftGrpSvc.run(new RangeCommand(keyFrom, keyTo, localNodeId)),
+ metaStorageRaftGrpSvc.run(
+ new RangeCommand(keyFrom, keyTo, localNodeId, uuidGenerator.randomUuid())),
MetaStorageServiceImpl::singleEntryResult
);
}
@@ -199,7 +206,7 @@
@NotNull WatchListener lsnr
) {
CompletableFuture<IgniteUuid> watchRes =
- metaStorageRaftGrpSvc.run(new WatchRangeKeysCommand(keyFrom, keyTo, revision, localNodeId));
+ metaStorageRaftGrpSvc.run(new WatchRangeKeysCommand(keyFrom, keyTo, revision, localNodeId, uuidGenerator.randomUuid()));
watchRes.thenAccept(
watchId -> watchProcessor.addWatch(
@@ -228,7 +235,7 @@
@NotNull WatchListener lsnr
) {
CompletableFuture<IgniteUuid> watchRes =
- metaStorageRaftGrpSvc.run(new WatchExactKeysCommand(keys, revision, localNodeId));
+ metaStorageRaftGrpSvc.run(new WatchExactKeysCommand(keys, revision, localNodeId, uuidGenerator.randomUuid()));
watchRes.thenAccept(
watchId -> watchProcessor.addWatch(
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java
index d0f4482..7e6d45c 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.metastorage.common.command;
import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -39,17 +40,22 @@
/** Id of the node that requests range. */
@NotNull private final String requesterNodeId;
+ /** Id of cursor that is associated with the current command. */
+ @NotNull private final IgniteUuid cursorId;
+
/**
* @param keyFrom Start key of range (inclusive).
* @param keyTo End key of range (exclusive).
* @param requesterNodeId Id of the node that requests range.
+ * @param cursorId Id of cursor that is associated with the current command.
*/
public RangeCommand(
@NotNull ByteArray keyFrom,
@Nullable ByteArray keyTo,
- @NotNull String requesterNodeId
+ @NotNull String requesterNodeId,
+ @NotNull IgniteUuid cursorId
) {
- this(keyFrom, keyTo, -1L, requesterNodeId);
+ this(keyFrom, keyTo, -1L, requesterNodeId, cursorId);
}
/**
@@ -57,17 +63,20 @@
* @param keyTo End key of range (exclusive).
* @param revUpperBound The upper bound for entry revision. {@code -1} means latest revision.
* @param requesterNodeId Id of the node that requests range.
+ * @param cursorId Id of cursor that is associated with the current command.
*/
public RangeCommand(
@NotNull ByteArray keyFrom,
@Nullable ByteArray keyTo,
long revUpperBound,
- @NotNull String requesterNodeId
+ @NotNull String requesterNodeId,
+ @NotNull IgniteUuid cursorId
) {
this.keyFrom = keyFrom.bytes();
this.keyTo = keyTo == null ? null : keyTo.bytes();
this.revUpperBound = revUpperBound;
this.requesterNodeId = requesterNodeId;
+ this.cursorId = cursorId;
}
/**
@@ -97,4 +106,11 @@
public @NotNull String requesterNodeId() {
return requesterNodeId;
}
+
+ /**
+ * @return Id of cursor that is associated with the current command.
+ */
+ @NotNull public IgniteUuid getCursorId() {
+ return cursorId;
+ }
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
index 4e1dec0..d45d896 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.Set;
import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
@@ -37,16 +38,20 @@
/** Id of the node that requests watch. */
@NotNull private final String requesterNodeId;
+ /** Id of cursor that is associated with the current command. */
+ @NotNull private final IgniteUuid cursorId;
+
/**
* @param keys The keys collection. Couldn't be {@code null}.
* @param revision Start revision inclusive. {@code 0} - all revisions.
* @param requesterNodeId Id of the node that requests watch.
- *
+ * @param cursorId Id of cursor that is associated with the current command.
*/
public WatchExactKeysCommand(
@NotNull Set<ByteArray> keys,
long revision,
- @NotNull String requesterNodeId
+ @NotNull String requesterNodeId,
+ @NotNull IgniteUuid cursorId
) {
this.keys = new ArrayList<>(keys.size());
@@ -56,6 +61,8 @@
this.revision = revision;
this.requesterNodeId = requesterNodeId;
+
+ this.cursorId = cursorId;
}
/**
@@ -78,4 +85,11 @@
public @NotNull String requesterNodeId() {
return requesterNodeId;
}
+
+ /**
+ * @return Id of cursor that is associated with the current command.
+ */
+ @NotNull public IgniteUuid getCursorId() {
+ return cursorId;
+ }
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java
index 21bbbd6..f4d40d1 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.metastorage.common.command;
import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -38,17 +39,22 @@
/** Id of the node that requests watch. */
@NotNull private final String requesterNodeId;
+ /** Id of cursor that is associated with the current command. */
+ @NotNull private final IgniteUuid cursorId;
+
/**
* @param keyFrom Start key of range (inclusive).
* @param keyTo End key of range (exclusive).
* @param requesterNodeId Id of the node that requests watch.
+ * @param cursorId Id of cursor that is associated with the current command.*
*/
public WatchRangeKeysCommand(
@Nullable ByteArray keyFrom,
@Nullable ByteArray keyTo,
- @NotNull String requesterNodeId
+ @NotNull String requesterNodeId,
+ @NotNull IgniteUuid cursorId
) {
- this(keyFrom, keyTo, 0L, requesterNodeId);
+ this(keyFrom, keyTo, 0L, requesterNodeId, cursorId);
}
/**
@@ -56,17 +62,20 @@
* @param keyTo End key of range (exclusive).
* @param revision Start revision inclusive. {@code 0} - all revisions.
* @param requesterNodeId Id of the node that requests watch.
+ * @param cursorId Id of cursor that is associated with the current command.
*/
public WatchRangeKeysCommand(
@Nullable ByteArray keyFrom,
@Nullable ByteArray keyTo,
long revision,
- @NotNull String requesterNodeId
+ @NotNull String requesterNodeId,
+ @NotNull IgniteUuid cursorId
) {
this.keyFrom = keyFrom == null ? null : keyFrom.bytes();
this.keyTo = keyTo == null ? null : keyTo.bytes();
this.revision = revision;
this.requesterNodeId = requesterNodeId;
+ this.cursorId = cursorId;
}
/**
@@ -96,4 +105,11 @@
public @NotNull String requesterNodeId() {
return requesterNodeId;
}
+
+ /**
+ * @return Id of cursor that is associated with the current command.
+ */
+ public IgniteUuid getCursorId() {
+ return cursorId;
+ }
}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index 952b142..98652bb 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -24,7 +24,6 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.ignite.internal.metastorage.common.ConditionType;
@@ -223,7 +222,7 @@
else if (clo.command() instanceof RangeCommand) {
RangeCommand rangeCmd = (RangeCommand) clo.command();
- IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
+ IgniteUuid cursorId = rangeCmd.getCursorId();
Cursor<Entry> cursor = (rangeCmd.revUpperBound() != -1) ?
storage.range(
@@ -302,7 +301,7 @@
else if (clo.command() instanceof WatchRangeKeysCommand) {
WatchRangeKeysCommand watchCmd = (WatchRangeKeysCommand) clo.command();
- IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
+ IgniteUuid cursorId = watchCmd.getCursorId();
Cursor<WatchEvent> cursor =
storage.watch(watchCmd.keyFrom(), watchCmd.keyTo(), watchCmd.revision());
@@ -321,7 +320,7 @@
else if (clo.command() instanceof WatchExactKeysCommand) {
WatchExactKeysCommand watchCmd = (WatchExactKeysCommand) clo.command();
- IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
+ IgniteUuid cursorId = watchCmd.getCursorId();
Cursor<WatchEvent> cursor = storage.watch(watchCmd.keys(), watchCmd.revision());