Introduce Range transactions (#23)
patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-18174
diff --git a/accord-core/src/main/java/accord/api/Key.java b/accord-core/src/main/java/accord/api/Key.java
index 8c4078c..1504b40 100644
--- a/accord-core/src/main/java/accord/api/Key.java
+++ b/accord-core/src/main/java/accord/api/Key.java
@@ -31,5 +31,8 @@
default Key asKey() { return this; }
@Override
+ default Key slice(Range range) { return this; }
+
+ @Override
default Range asRange() { throw new UnsupportedOperationException(); }
}
diff --git a/accord-core/src/main/java/accord/api/Read.java b/accord-core/src/main/java/accord/api/Read.java
index 743b452..deff2c6 100644
--- a/accord-core/src/main/java/accord/api/Read.java
+++ b/accord-core/src/main/java/accord/api/Read.java
@@ -19,10 +19,7 @@
package accord.api;
import accord.local.SafeCommandStore;
-import accord.primitives.Ranges;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
+import accord.primitives.*;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
@@ -35,8 +32,8 @@
*/
public interface Read
{
- Keys keys();
- Future<Data> read(Key key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store);
+ Seekables<?, ?> keys();
+ Future<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store);
class ReadFuture extends AsyncPromise<Data> implements BiConsumer<Data, Throwable>
{
diff --git a/accord-core/src/main/java/accord/api/RoutingKey.java b/accord-core/src/main/java/accord/api/RoutingKey.java
index a61bfec..181bbb6 100644
--- a/accord-core/src/main/java/accord/api/RoutingKey.java
+++ b/accord-core/src/main/java/accord/api/RoutingKey.java
@@ -1,9 +1,16 @@
package accord.api;
+import accord.primitives.Range;
import accord.primitives.RoutableKey;
import accord.primitives.Unseekable;
+import accord.utils.ArrayBuffers;
+
+import java.util.Arrays;
+
+import static accord.utils.ArrayBuffers.cachedRoutingKeys;
public interface RoutingKey extends Unseekable, RoutableKey
{
@Override default RoutingKey toUnseekable() { return this; }
+ Range asRange();
}
diff --git a/accord-core/src/main/java/accord/api/Update.java b/accord-core/src/main/java/accord/api/Update.java
index e100ec4..b3eb387 100644
--- a/accord-core/src/main/java/accord/api/Update.java
+++ b/accord-core/src/main/java/accord/api/Update.java
@@ -20,6 +20,7 @@
import accord.primitives.Ranges;
import accord.primitives.Keys;
+import accord.primitives.Seekables;
import javax.annotation.Nullable;
@@ -30,7 +31,7 @@
*/
public interface Update
{
- Keys keys();
+ Seekables<?, ?> keys();
// null is provided only if nothing was read
Write apply(@Nullable Data data);
Update slice(Ranges ranges);
diff --git a/accord-core/src/main/java/accord/api/Write.java b/accord-core/src/main/java/accord/api/Write.java
index 6e5f439..d538b69 100644
--- a/accord-core/src/main/java/accord/api/Write.java
+++ b/accord-core/src/main/java/accord/api/Write.java
@@ -19,6 +19,7 @@
package accord.api;
import accord.local.SafeCommandStore;
+import accord.primitives.Seekable;
import accord.primitives.Timestamp;
import org.apache.cassandra.utils.concurrent.Future;
@@ -29,5 +30,5 @@
*/
public interface Write
{
- Future<Void> apply(Key key, SafeCommandStore safeStore, Timestamp executeAt, DataStore store);
+ Future<Void> apply(Seekable key, SafeCommandStore safeStore, Timestamp executeAt, DataStore store);
}
diff --git a/accord-core/src/main/java/accord/coordinate/CheckOn.java b/accord-core/src/main/java/accord/coordinate/CheckOn.java
index a7ec61f..0bcd756 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckOn.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckOn.java
@@ -40,6 +40,7 @@
import static accord.local.PreLoadContext.contextFor;
import static accord.local.SaveStatus.NotWitnessed;
import static accord.local.Status.*;
+import static accord.primitives.Routables.Slice.Minimal;
/**
* Check on the status of a transaction. Returns early once enough information has been achieved to meet the requested
@@ -136,21 +137,23 @@
public OnDone()
{
- Ranges localRanges = node.topology().localRangesForEpochs(txnId.epoch(), untilLocalEpoch);
- PartialRoute<?> selfRoute = route().slice(localRanges);
+ Ranges sliceRanges = node.topology().localRangesForEpochs(txnId.epoch(), untilLocalEpoch);
+ Ranges covering = route().sliceCovering(sliceRanges, Minimal);
+ Unseekables<?, ?> intersectingKeys = route().slice(covering, Minimal);
+
full = (CheckStatusOkFull) merged;
- sufficientFor = full.sufficientFor(selfRoute);
+ sufficientFor = full.sufficientFor(intersectingKeys);
maxRoute = Route.merge((Route)route(), full.route);
progressKey = node.trySelectProgressKey(txnId, maxRoute);
PartialTxn partialTxn = null;
if (sufficientFor.definition.isKnown())
- partialTxn = full.partialTxn.slice(localRanges, true).reconstitutePartial(selfRoute);
+ partialTxn = full.partialTxn.slice(sliceRanges, true).reconstitutePartial(covering);
this.partialTxn = partialTxn;
PartialDeps partialDeps = null;
- if (sufficientFor.deps.isDecisionKnown())
- partialDeps = full.committedDeps.slice(localRanges).reconstitutePartial(selfRoute);
+ if (sufficientFor.deps.hasDecidedDeps())
+ partialDeps = full.committedDeps.slice(sliceRanges).reconstitutePartial(covering);
this.partialDeps = partialDeps;
}
@@ -161,7 +164,7 @@
keys = partialTxn.keys();
Iterable<TxnId> txnIds = Collections.singleton(txnId);
- if (sufficientFor.deps.isDecisionKnown())
+ if (sufficientFor.deps.hasDecidedDeps())
txnIds = Iterables.concat(txnIds, partialDeps.txnIds());
PreLoadContext loadContext = contextFor(txnIds, keys);
@@ -219,7 +222,7 @@
if (!safeStore.ranges().at(txnId.epoch()).contains(homeKey))
return null;
- Timestamp executeAt = merged.saveStatus.known.executeAt.isDecisionKnown() ? merged.executeAt : null;
+ Timestamp executeAt = merged.saveStatus.known.executeAt.hasDecidedExecuteAt() ? merged.executeAt : null;
command.setDurability(safeStore, merged.durability, homeKey, executeAt);
safeStore.progressLog().durable(command, null);
return null;
diff --git a/accord-core/src/main/java/accord/coordinate/CheckShards.java b/accord-core/src/main/java/accord/coordinate/CheckShards.java
index 5500d90..f1da841 100644
--- a/accord-core/src/main/java/accord/coordinate/CheckShards.java
+++ b/accord-core/src/main/java/accord/coordinate/CheckShards.java
@@ -42,7 +42,8 @@
@Override
protected void contact(Id id)
{
- node.send(id, new CheckStatus(txnId, contact.slice(topologies().computeRangesForNode(id)), txnId.epoch(), untilRemoteEpoch, includeInfo), this);
+ Unseekables<?, ?> unseekables = contact.slice(topologies().computeRangesForNode(id));
+ node.send(id, new CheckStatus(txnId, unseekables, txnId.epoch(), untilRemoteEpoch, includeInfo), this);
}
protected boolean isSufficient(Id from, CheckStatusOk ok) { return isSufficient(ok); }
diff --git a/accord-core/src/main/java/accord/coordinate/FetchData.java b/accord-core/src/main/java/accord/coordinate/FetchData.java
index 688e8eb..03c372f 100644
--- a/accord-core/src/main/java/accord/coordinate/FetchData.java
+++ b/accord-core/src/main/java/accord/coordinate/FetchData.java
@@ -89,7 +89,7 @@
// information to fulfil that phase locally we should downgrade the response we give to the callback
Known sufficientFor = ok.sufficientFor(fetch);
// if we discover the executeAt as part of this action, use that to decide if we requested enough info
- Timestamp exec = executeAt != null ? executeAt : ok.saveStatus.known.executeAt.isDecisionKnown() ? ok.executeAt : null;
+ Timestamp exec = executeAt != null ? executeAt : ok.saveStatus.known.executeAt.hasDecidedExecuteAt() ? ok.executeAt : null;
if (sufficientFor.outcome == OutcomeKnown && (exec == null || untilLocalEpoch < exec.epoch()))
sufficientFor = sufficientFor.with(OutcomeUnknown);
callback.accept(sufficientFor, null);
diff --git a/accord-core/src/main/java/accord/coordinate/Invalidate.java b/accord-core/src/main/java/accord/coordinate/Invalidate.java
index e7c8cc8..4361a15 100644
--- a/accord-core/src/main/java/accord/coordinate/Invalidate.java
+++ b/accord-core/src/main/java/accord/coordinate/Invalidate.java
@@ -19,12 +19,15 @@
package accord.coordinate;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.function.BiConsumer;
import accord.coordinate.tracking.InvalidationTracker;
import accord.coordinate.tracking.InvalidationTracker.InvalidationShardTracker;
import accord.coordinate.tracking.RequestStatus;
+import accord.local.Node.Id;
import accord.local.Status;
import accord.messages.Commit;
import accord.primitives.*;
@@ -59,6 +62,7 @@
private final List<InvalidateReply> replies = new ArrayList<>();
private final InvalidationTracker tracker;
private Throwable failure;
+ private final Map<Id, InvalidateReply> debug = Invariants.debug() ? new HashMap<>() : null;
private Invalidate(Node node, Ballot ballot, TxnId txnId, Unseekables<?, ?> invalidateWith, boolean transitivelyInvokedByPriorInvalidation, BiConsumer<Outcome, Throwable> callback)
{
@@ -91,17 +95,18 @@
}
@Override
- public synchronized void onSuccess(Node.Id from, InvalidateReply reply)
+ public synchronized void onSuccess(Id from, InvalidateReply reply)
{
if (isDone || isPrepareDone)
return;
+ if (debug != null) debug.put(from, reply);
replies.add(reply);
handle(tracker.recordSuccess(from, reply.isPromised(), reply.acceptedFastPath));
}
@Override
- public void onFailure(Node.Id from, Throwable failure)
+ public void onFailure(Id from, Throwable failure)
{
if (isDone || isPrepareDone)
return;
@@ -227,8 +232,9 @@
// if we have witnessed the transaction, but are able to invalidate, do we want to proceed?
// Probably simplest to do so, but perhaps better for user if we don't.
- // TODO (now, rangetxns): This should be a Routable, or we should guarantee it is safe to operate on any key in the range
- RoutingKey invalidateWithKey = invalidateWith.slice(Ranges.of(tracker.promisedShard().range)).get(0).someIntersectingRoutingKey();
+ Ranges ranges = Ranges.of(tracker.promisedShard().range);
+ // we look up by TxnId at the target node, so it's fine to pick a RoutingKey even if it's a range transaction
+ RoutingKey invalidateWithKey = invalidateWith.slice(ranges).get(0).someIntersectingRoutingKey(ranges);
proposeInvalidate(node, ballot, txnId, invalidateWithKey, (success, fail) -> {
/*
We're now inside our *exactly once* callback we registered with proposeInvalidate, and we need to
@@ -271,7 +277,7 @@
}
@Override
- public void onCallbackFailure(Node.Id from, Throwable failure)
+ public void onCallbackFailure(Id from, Throwable failure)
{
if (isDone)
return;
diff --git a/accord-core/src/main/java/accord/coordinate/Propose.java b/accord-core/src/main/java/accord/coordinate/Propose.java
index 3b75512..6a9f646 100644
--- a/accord-core/src/main/java/accord/coordinate/Propose.java
+++ b/accord-core/src/main/java/accord/coordinate/Propose.java
@@ -19,7 +19,9 @@
package accord.coordinate;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.function.BiConsumer;
import accord.api.Result;
@@ -36,9 +38,11 @@
import accord.local.Node.Id;
import accord.messages.Accept;
import accord.messages.Accept.AcceptReply;
+import accord.utils.Invariants;
import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
import static accord.coordinate.tracking.RequestStatus.Failed;
+import static accord.utils.Invariants.debug;
class Propose implements Callback<AcceptReply>
{
@@ -50,6 +54,7 @@
final Deps deps;
private final List<AcceptReply> acceptOks;
+ private final Map<Id, AcceptReply> debug = debug() ? new HashMap<>() : null;
private final Timestamp executeAt;
private final QuorumTracker acceptTracker;
private final BiConsumer<Result, Throwable> callback;
@@ -80,7 +85,7 @@
Timestamp executeAt, Deps deps, BiConsumer<Result, Throwable> callback)
{
Propose propose = new Propose(node, topologies, ballot, txnId, txn, route, deps, executeAt, callback);
- node.send(propose.acceptTracker.nodes(), to -> new Accept(to, topologies, ballot, txnId, route, executeAt, txn.keys(), deps, txn.kind()), propose);
+ node.send(propose.acceptTracker.nodes(), to -> new Accept(to, topologies, ballot, txnId, route, executeAt, txn.keys(), deps), propose);
}
@Override
@@ -89,6 +94,8 @@
if (isDone)
return;
+ if (debug != null) debug.put(from, reply);
+
switch (reply.outcome())
{
default: throw new IllegalStateException();
diff --git a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
index 3892b04..65d569d 100644
--- a/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
+++ b/accord-core/src/main/java/accord/coordinate/ReadCoordinator.java
@@ -9,15 +9,15 @@
import accord.local.Node.Id;
import accord.primitives.TxnId;
import accord.topology.Topologies;
+import accord.utils.Invariants;
import java.util.*;
+import static accord.utils.Invariants.debug;
import static com.google.common.collect.Sets.newHashSetWithExpectedSize;
abstract class ReadCoordinator<Reply extends accord.messages.Reply> extends ReadTracker implements Callback<Reply>
{
- private static final boolean DEBUG = false;
-
protected enum Action
{
/**
@@ -55,7 +55,7 @@
final TxnId txnId;
private boolean isDone;
private Throwable failure;
- final Map<Id, Object> debug = DEBUG ? new HashMap<>() : null;
+ final Map<Id, Object> debug = debug() ? new HashMap<>() : null;
ReadCoordinator(Node node, Topologies topologies, TxnId txnId)
{
diff --git a/accord-core/src/main/java/accord/coordinate/Recover.java b/accord-core/src/main/java/accord/coordinate/Recover.java
index ccad4e7..bcf4d28 100644
--- a/accord-core/src/main/java/accord/coordinate/Recover.java
+++ b/accord-core/src/main/java/accord/coordinate/Recover.java
@@ -19,7 +19,9 @@
package accord.coordinate;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -50,6 +52,7 @@
import static accord.coordinate.tracking.RequestStatus.Failed;
import static accord.coordinate.tracking.RequestStatus.Success;
import static accord.messages.BeginRecovery.RecoverOk.maxAcceptedOrLater;
+import static accord.utils.Invariants.debug;
// TODO (low priority, cleanup): rename to Recover (verb); rename Recover message to not clash
public class Recover implements Callback<RecoverReply>, BiConsumer<Result, Throwable>
@@ -123,6 +126,7 @@
private final List<RecoverOk> recoverOks = new ArrayList<>();
private final RecoveryTracker tracker;
private boolean isBallotPromised;
+ private final Map<Id, RecoverReply> debug = debug() ? new HashMap<>() : null;
private Recover(Node node, Ballot ballot, TxnId txnId, Txn txn, FullRoute<?> route, BiConsumer<Outcome, Throwable> callback, Topologies topologies)
{
@@ -179,6 +183,8 @@
if (isDone || isBallotPromised)
return;
+ if (debug != null) debug.put(from, reply);
+
if (!reply.isOk())
{
accept(null, new Preempted(txnId, route.homeKey()));
@@ -306,7 +312,7 @@
private Deps mergeDeps()
{
- Ranges ranges = recoverOks.stream().map(r -> r.deps.covering).reduce(Ranges::union).orElseThrow(NoSuchElementException::new);
+ Ranges ranges = recoverOks.stream().map(r -> r.deps.covering).reduce(Ranges::with).orElseThrow(NoSuchElementException::new);
Invariants.checkState(ranges.containsAll(txn.keys()));
return Deps.merge(recoverOks, r -> r.deps);
}
diff --git a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
index 9dfb0b1..317995b 100644
--- a/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
+++ b/accord-core/src/main/java/accord/coordinate/RecoverWithRoute.java
@@ -135,10 +135,10 @@
case OutcomeApplied:
case OutcomeKnown:
Invariants.checkState(known.definition.isKnown());
- Invariants.checkState(known.executeAt.isDecisionKnown());
+ Invariants.checkState(known.executeAt.hasDecidedExecuteAt());
// TODO (required): we might not be able to reconstitute Txn if we have GC'd on some shards
Txn txn = merged.partialTxn.reconstitute(route);
- if (known.deps.isDecisionKnown())
+ if (known.deps.hasDecidedDeps())
{
Deps deps = merged.committedDeps.reconstitute(route());
node.withEpoch(merged.executeAt.epoch(), () -> {
diff --git a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
index 42e4e29..da03da7 100644
--- a/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
+++ b/accord-core/src/main/java/accord/coordinate/tracking/FastPathTracker.java
@@ -29,6 +29,7 @@
import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.*;
+// TODO (desired, efficiency): if any shard *cannot* take the fast path, and all shards have accepted, terminate
public class FastPathTracker extends AbstractTracker<FastPathTracker.FastPathShardTracker, Node.Id>
{
private static final ShardOutcome<FastPathTracker> NewFastPathSuccess = (tracker, shardIndex) -> {
diff --git a/accord-core/src/main/java/accord/impl/CommandsForKey.java b/accord-core/src/main/java/accord/impl/CommandsForKey.java
new file mode 100644
index 0000000..91a77ca
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/CommandsForKey.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import accord.api.Key;
+import accord.local.*;
+import accord.local.SafeCommandStore.CommandFunction;
+import accord.local.SafeCommandStore.TestDep;
+import accord.local.SafeCommandStore.TestKind;
+import accord.primitives.Keys;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+public abstract class CommandsForKey implements CommandListener
+{
+ private static final Logger logger = LoggerFactory.getLogger(CommandsForKey.class);
+
+ public interface CommandTimeseries
+ {
+ void add(Timestamp timestamp, Command command);
+ void remove(Timestamp timestamp);
+
+ boolean isEmpty();
+
+ enum TestTimestamp { BEFORE, AFTER }
+
+ /**
+ * All commands before/after (exclusive of) the given timestamp
+ *
+ * Note that {@code testDep} applies only to commands that know at least proposed deps; if specified any
+ * commands that do not know any deps will be ignored.
+ *
+ * TODO (expected, efficiency): TestDep should be asynchronous; data should not be kept memory-resident as only used for recovery
+ */
+ <T> T mapReduce(TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
+ TestDep testDep, @Nullable TxnId depId,
+ @Nullable Status minStatus, @Nullable Status maxStatus,
+ CommandFunction<T, T> map, T initialValue, T terminalValue);
+ }
+
+ public abstract Key key();
+ public abstract CommandTimeseries byId();
+ public abstract CommandTimeseries byExecuteAt();
+
+ public abstract Timestamp max();
+ protected abstract void updateMax(Timestamp timestamp);
+
+ @Override
+ public PreLoadContext listenerPreLoadContext(TxnId caller)
+ {
+ return PreLoadContext.contextFor(caller, Keys.of(key()));
+ }
+
+ @Override
+ public void onChange(SafeCommandStore safeStore, Command command)
+ {
+ logger.trace("[{}]: updating as listener in response to change on {} with status {} ({})",
+ key(), command.txnId(), command.status(), command);
+ updateMax(command.executeAt());
+ switch (command.status())
+ {
+ default: throw new AssertionError();
+ case PreAccepted:
+ case NotWitnessed:
+ case Accepted:
+ case AcceptedInvalidate:
+ case PreCommitted:
+ break;
+ case Applied:
+ case PreApplied:
+ case Committed:
+ case ReadyToExecute:
+ byExecuteAt().remove(command.txnId());
+ byExecuteAt().add(command.executeAt(), command);
+ break;
+ case Invalidated:
+ byId().remove(command.txnId());
+ byExecuteAt().remove(command.txnId());
+ command.removeListener(this);
+ break;
+ }
+ }
+
+ public void register(Command command)
+ {
+ updateMax(command.executeAt());
+ byId().add(command.txnId(), command);
+ byExecuteAt().add(command.txnId(), command);
+ command.addListener(this);
+ }
+
+ public boolean isEmpty()
+ {
+ return byId().isEmpty();
+ }
+}
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommand.java b/accord-core/src/main/java/accord/impl/InMemoryCommand.java
index 8525046..8c49b94 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommand.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommand.java
@@ -24,6 +24,8 @@
import accord.local.Status.Durability;
import accord.local.Status.Known;
import accord.primitives.*;
+import accord.primitives.Txn.Kind;
+import accord.utils.Invariants;
import javax.annotation.Nullable;
import java.util.*;
@@ -129,7 +131,7 @@
@Override
protected void setRoute(Route<?> route)
{
- this.route = route;
+ this.route = Invariants.checkArgument(route, !route.isEmpty());
}
@Override
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index b2444ec..66a3439 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -18,7 +18,7 @@
package accord.impl;
-import accord.local.CommandStore; // java8 fails compilation if this is in correct position
+import accord.local.*;
import accord.local.SyncCommandStores.SyncCommandStore; // java8 fails compilation if this is in correct position
import accord.api.Agent;
import accord.api.DataStore;
@@ -26,15 +26,9 @@
import accord.api.ProgressLog;
import accord.impl.InMemoryCommandStore.SingleThread.AsyncState;
import accord.impl.InMemoryCommandStore.Synchronized.SynchronizedState;
-import accord.local.Command;
-import accord.local.CommandsForKey;
-import accord.local.CommandListener;
-import accord.local.NodeTimeService;
-import accord.local.PreLoadContext;
-import accord.local.SafeCommandStore;
-import accord.local.SyncCommandStores;
import accord.local.CommandStores.RangesForEpochHolder;
import accord.local.CommandStores.RangesForEpoch;
+import accord.impl.CommandsForKey.CommandTimeseries;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.primitives.*;
@@ -42,18 +36,20 @@
import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.Future;
-import java.util.Collection;
-import java.util.NavigableMap;
-import java.util.TreeMap;
+import javax.annotation.Nullable;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BinaryOperator;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
-import java.util.stream.Collectors;
-// TODO (low priority): efficiency
+import static accord.local.SafeCommandStore.TestDep.*;
+import static accord.local.SafeCommandStore.TestKind.Ws;
+import static accord.local.Status.Committed;
+import static accord.primitives.Routables.Slice.Minimal;
+
public class InMemoryCommandStore
{
public static abstract class State implements SafeCommandStore
@@ -68,6 +64,25 @@
private final CommandStore commandStore;
private final NavigableMap<TxnId, Command> commands = new TreeMap<>();
private final NavigableMap<RoutableKey, InMemoryCommandsForKey> commandsForKey = new TreeMap<>();
+ // TODO (find library, efficiency): this is obviously super inefficient, need some range map
+ private final TreeMap<TxnId, RangeCommand> rangeCommands = new TreeMap<>();
+
+ static class RangeCommand
+ {
+ final Command command;
+ Ranges ranges;
+
+ RangeCommand(Command command)
+ {
+ this.command = command;
+ }
+
+ void update(Ranges add)
+ {
+ if (ranges == null) ranges = add;
+ else ranges = ranges.with(add);
+ }
+ }
public State(NodeTimeService time, Agent agent, DataStore store, ProgressLog progressLog, RangesForEpochHolder rangesForEpoch, CommandStore commandStore)
{
@@ -103,7 +118,6 @@
return commands.containsKey(txnId);
}
- @Override
public CommandsForKey commandsForKey(Key key)
{
return commandsForKey.computeIfAbsent(key, k -> new InMemoryCommandsForKey((Key) k));
@@ -114,7 +128,6 @@
return commandsForKey.containsKey(key);
}
- @Override
public CommandsForKey maybeCommandsForKey(Key key)
{
return commandsForKey.get(key);
@@ -187,7 +200,14 @@
private Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice)
{
- return mapReduce(keysOrRanges, slice, CommandsForKey::max, (a, b) -> a.compareTo(b) >= 0 ? a : b, Timestamp.NONE);
+ Timestamp timestamp = mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> Timestamp.max(forKey.max(), prev), Timestamp.NONE, null);
+ Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
+ for (RangeCommand command : rangeCommands.values())
+ {
+ if (command.ranges.intersects(sliced))
+ timestamp = Timestamp.max(timestamp, command.command.executeAt());
+ }
+ return timestamp;
}
public void forEpochCommands(Ranges ranges, long epoch, Consumer<Command> consumer)
@@ -220,38 +240,176 @@
range.endInclusive()).values();
for (InMemoryCommandsForKey commands : rangeCommands)
{
- commands.committedByExecuteAt()
+ commands.byExecuteAt()
.between(minTimestamp, maxTimestamp)
+ .filter(command -> command.hasBeen(Committed))
.forEach(consumer);
}
}
}
@Override
- public <T> T mapReduce(Routables<?, ?> keysOrRanges, Ranges slice, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
+ public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice, TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
{
- switch (keysOrRanges.kindOfContents()) {
- default:
- throw new AssertionError();
+ accumulate = mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> {
+ CommandTimeseries timeseries;
+ switch (testTimestamp)
+ {
+ default: throw new AssertionError();
+ case STARTED_AFTER:
+ case STARTED_BEFORE:
+ timeseries = forKey.byId();
+ break;
+ case EXECUTES_AFTER:
+ case MAY_EXECUTE_BEFORE:
+ timeseries = forKey.byExecuteAt();
+ }
+ CommandTimeseries.TestTimestamp remapTestTimestamp;
+ switch (testTimestamp)
+ {
+ default: throw new AssertionError();
+ case STARTED_AFTER:
+ case EXECUTES_AFTER:
+ remapTestTimestamp = CommandTimeseries.TestTimestamp.AFTER;
+ break;
+ case STARTED_BEFORE:
+ case MAY_EXECUTE_BEFORE:
+ remapTestTimestamp = CommandTimeseries.TestTimestamp.BEFORE;
+ }
+ return timeseries.mapReduce(testKind, remapTestTimestamp, timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue);
+ }, accumulate, terminalValue);
+
+ if (accumulate.equals(terminalValue))
+ return accumulate;
+
+ // TODO (find lib, efficiency): this is super inefficient, need to store Command in something queryable
+ Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal);
+ Map<Range, List<Command>> collect = new TreeMap<>(Range::compare);
+ for (RangeCommand rangeCommand : rangeCommands.values())
+ {
+ Command command = rangeCommand.command;
+ switch (testTimestamp)
+ {
+ default: throw new AssertionError();
+ case STARTED_AFTER:
+ if (command.txnId().compareTo(timestamp) < 0) continue;
+ else break;
+ case STARTED_BEFORE:
+ if (command.txnId().compareTo(timestamp) > 0) continue;
+ else break;
+ case EXECUTES_AFTER:
+ if (command.executeAt().compareTo(timestamp) < 0) continue;
+ else break;
+ case MAY_EXECUTE_BEFORE:
+ Timestamp compareTo = command.known().executeAt.hasDecidedExecuteAt() ? command.executeAt() : command.txnId();
+ if (compareTo.compareTo(timestamp) > 0) continue;
+ else break;
+ }
+
+ if (minStatus != null && command.status().compareTo(minStatus) < 0)
+ continue;
+
+ if (maxStatus != null && command.status().compareTo(maxStatus) > 0)
+ continue;
+
+ if (testKind == Ws && command.txnId().rw().isRead())
+ continue;
+
+ if (testDep != ANY_DEPS)
+ {
+ if (!command.known().deps.hasProposedOrDecidedDeps())
+ continue;
+
+ if ((testDep == WITH) == !command.partialDeps().contains(depId))
+ continue;
+ }
+
+ if (!rangeCommand.ranges.intersects(sliced))
+ continue;
+
+ Routables.foldl(rangeCommand.ranges, sliced, (r, in, i) -> {
+ // TODO (easy, efficiency): pass command as a parameter to Fold
+ List<Command> list = in.computeIfAbsent(r, ignore -> new ArrayList<>());
+ if (list.isEmpty() || list.get(list.size() - 1) != command)
+ list.add(command);
+ return in;
+ }, collect);
+ }
+
+ for (Map.Entry<Range, List<Command>> e : collect.entrySet())
+ {
+ for (Command command : e.getValue())
+ accumulate = map.apply(e.getKey(), command.txnId(), command.executeAt(), accumulate);
+ }
+
+ return accumulate;
+ }
+
+ @Override
+ public void register(Seekables<?, ?> keysOrRanges, Ranges slice, Command command)
+ {
+ switch (keysOrRanges.domain())
+ {
+ default: throw new AssertionError();
case Key:
- AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
- return keys.stream()
- .filter(slice::contains)
- .map(this::commandsForKey)
- .map(map)
- .reduce(initialValue, reduce);
+ forEach(keysOrRanges, slice, forKey -> forKey.register(command));
+ break;
case Range:
- Ranges ranges = (Ranges) keysOrRanges;
- return ranges.slice(slice).stream().flatMap(range ->
- commandsForKey.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive()).values().stream()
- ).map(map).reduce(initialValue, reduce);
+ rangeCommands.computeIfAbsent(command.txnId(), ignore -> new RangeCommand(command))
+ .update((Ranges)keysOrRanges);
}
}
@Override
- public void forEach(Routables<?, ?> keysOrRanges, Ranges slice, Consumer<CommandsForKey> forEach)
+ public void register(Seekable keyOrRange, Ranges slice, Command command)
{
- switch (keysOrRanges.kindOfContents()) {
+ switch (keyOrRange.domain())
+ {
+ default: throw new AssertionError();
+ case Key:
+ forEach(keyOrRange, slice, forKey -> forKey.register(command));
+ break;
+ case Range:
+ rangeCommands.computeIfAbsent(command.txnId(), ignore -> new RangeCommand(command))
+ .update(Ranges.of((Range)keyOrRange));
+ }
+ }
+
+ private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges slice, BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue)
+ {
+ switch (keysOrRanges.domain()) {
+ default:
+ throw new AssertionError();
+ case Key:
+ AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges;
+ for (Key key : keys)
+ {
+ if (!slice.contains(key)) continue;
+ CommandsForKey forKey = commandsForKey(key);
+ accumulate = map.apply(forKey, accumulate);
+ if (accumulate.equals(terminalValue))
+ return accumulate;
+ }
+ break;
+ case Range:
+ Ranges ranges = (Ranges) keysOrRanges;
+ Ranges sliced = ranges.slice(slice, Minimal);
+ for (Range range : sliced)
+ {
+ for (CommandsForKey forKey : commandsForKey.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive()).values())
+ {
+ accumulate = map.apply(forKey, accumulate);
+ if (accumulate.equals(terminalValue))
+ return accumulate;
+ }
+ }
+ }
+ return accumulate;
+ }
+
+ private void forEach(Seekables<?, ?> keysOrRanges, Ranges slice, Consumer<CommandsForKey> forEach)
+ {
+ switch (keysOrRanges.domain()) {
default:
throw new AssertionError();
case Key:
@@ -267,8 +425,7 @@
}
}
- @Override
- public void forEach(Routable keyOrRange, Ranges slice, Consumer<CommandsForKey> forEach)
+ private void forEach(Routable keyOrRange, Ranges slice, Consumer<CommandsForKey> forEach)
{
switch (keyOrRange.domain())
{
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java b/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
index 8287f45..b2a1a65 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandsForKey.java
@@ -20,35 +20,35 @@
import accord.api.Key;
import accord.local.Command;
-import accord.local.CommandsForKey;
+import accord.local.SafeCommandStore.CommandFunction;
+import accord.local.SafeCommandStore.TestDep;
+import accord.local.SafeCommandStore.TestKind;
import accord.local.Status;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
-import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.function.Consumer;
-import java.util.function.Function;
import java.util.stream.Stream;
-import static accord.local.CommandsForKey.CommandTimeseries.TestDep.*;
-import static accord.local.CommandsForKey.CommandTimeseries.TestKind.RorWs;
+import static accord.local.SafeCommandStore.TestDep.*;
+import static accord.local.SafeCommandStore.TestKind.Ws;
import static accord.local.Status.KnownDeps.DepsUnknown;
import static accord.local.Status.PreAccepted;
+import static accord.local.Status.PreCommitted;
public class InMemoryCommandsForKey extends CommandsForKey
{
- public static class InMemoryCommandTimeseries<T> implements CommandTimeseries<T>
+ public static class InMemoryCommandTimeseries implements CommandTimeseries
{
private final NavigableMap<Timestamp, Command> commands = new TreeMap<>();
+ private final Key key;
- final Function<Command, T> map;
-
- InMemoryCommandTimeseries(Function<Command, T> map)
+ public InMemoryCommandTimeseries(Key key)
{
- this.map = map;
+ this.key = key;
}
@Override
@@ -74,25 +74,27 @@
}
@Override
- public Stream<T> before(@Nonnull Timestamp timestamp, @Nonnull TestKind testKind, @Nonnull TestDep testDep, @Nullable TxnId depId, @Nonnull TestStatus testStatus, @Nullable Status status)
+ public <T> T mapReduce(TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
+ TestDep testDep, @Nullable TxnId depId,
+ @Nullable Status minStatus, @Nullable Status maxStatus,
+ CommandFunction<T, T> map, T initialValue, T terminalValue)
{
- return commands.headMap(timestamp, false).values().stream()
- .filter(cmd -> testKind == RorWs || cmd.txnId().isWrite())
- // If we don't have any dependencies, we treat a dependency filter as a mismatch
- .filter(cmd -> testDep == ANY_DEPS || (cmd.known().deps != DepsUnknown && (cmd.partialDeps().contains(depId) ^ (testDep == WITHOUT))))
- .filter(cmd -> TestStatus.test(cmd.status(), testStatus, status))
- .map(map);
- }
- @Override
- public Stream<T> after(@Nonnull Timestamp timestamp, @Nonnull TestKind testKind, @Nonnull TestDep testDep, @Nullable TxnId depId, @Nonnull TestStatus testStatus, @Nullable Status status)
- {
- return commands.tailMap(timestamp, false).values().stream()
- .filter(cmd -> testKind == RorWs || cmd.txnId().isWrite())
+ for (Command cmd : (testTimestamp == TestTimestamp.BEFORE ? commands.headMap(timestamp, false) : commands.tailMap(timestamp, false)).values())
+ {
+ if (testKind == Ws && cmd.txnId().isRead()) continue;
// If we don't have any dependencies, we treat a dependency filter as a mismatch
- .filter(cmd -> testDep == ANY_DEPS || (cmd.known().deps != DepsUnknown && (cmd.partialDeps().contains(depId) ^ (testDep == WITHOUT))))
- .filter(cmd -> TestStatus.test(cmd.status(), testStatus, status))
- .map(map);
+ if (testDep != ANY_DEPS && (!cmd.known().deps.hasProposedOrDecidedDeps() || (cmd.partialDeps().contains(depId) != (testDep == WITH))))
+ continue;
+ if (minStatus != null && minStatus.compareTo(cmd.status()) > 0)
+ continue;
+ if (maxStatus != null && maxStatus.compareTo(cmd.status()) < 0)
+ continue;
+ initialValue = map.apply(key, cmd.txnId(), cmd.executeAt(), initialValue);
+ if (initialValue.equals(terminalValue))
+ break;
+ }
+ return initialValue;
}
public Stream<Command> between(Timestamp min, Timestamp max)
@@ -108,15 +110,16 @@
// TODO (now): add validation that anything inserted into *committedBy* has everything prior in its dependencies
private final Key key;
- private final InMemoryCommandTimeseries<TxnIdWithExecuteAt> uncommitted = new InMemoryCommandTimeseries<>(cmd -> new TxnIdWithExecuteAt(cmd.txnId(), cmd.executeAt()));
- private final InMemoryCommandTimeseries<TxnId> committedById = new InMemoryCommandTimeseries<>(Command::txnId);
- private final InMemoryCommandTimeseries<TxnId> committedByExecuteAt = new InMemoryCommandTimeseries<>(Command::txnId);
+ private final InMemoryCommandTimeseries byId;
+ private final InMemoryCommandTimeseries byExecuteAt;
private Timestamp max = Timestamp.NONE;
public InMemoryCommandsForKey(Key key)
{
this.key = key;
+ this.byId = new InMemoryCommandTimeseries(key);
+ this.byExecuteAt = new InMemoryCommandTimeseries(key);
}
@Override
@@ -138,29 +141,20 @@
}
@Override
- public InMemoryCommandTimeseries<TxnIdWithExecuteAt> uncommitted()
+ public InMemoryCommandTimeseries byId()
{
- return uncommitted;
+ return byId;
}
@Override
- public InMemoryCommandTimeseries<TxnId> committedById()
+ public InMemoryCommandTimeseries byExecuteAt()
{
- return committedById;
- }
-
- @Override
- public InMemoryCommandTimeseries<TxnId> committedByExecuteAt()
- {
- return committedByExecuteAt;
+ return byExecuteAt;
}
public void forWitnessed(Timestamp minTs, Timestamp maxTs, Consumer<Command> consumer)
{
- uncommitted().between(minTs, maxTs)
- .filter(cmd -> cmd.hasBeen(PreAccepted)).forEach(consumer);
- committedById().between(minTs, maxTs).forEach(consumer);
- committedByExecuteAt().between(minTs, maxTs)
- .filter(cmd -> cmd.txnId().compareTo(minTs) < 0 || cmd.txnId().compareTo(maxTs) > 0).forEach(consumer);
+ byId.between(minTs, maxTs).filter(cmd -> cmd.hasBeen(PreAccepted)).forEach(consumer);
+ byExecuteAt.between(minTs, maxTs).filter(cmd -> cmd.hasBeen(PreCommitted)).forEach(consumer);
}
}
diff --git a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
index fa9d48b..546dc72 100644
--- a/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/SimpleProgressLog.java
@@ -515,8 +515,7 @@
private void invalidate(Node node, TxnId txnId, Unseekables<?, ?> someKeys)
{
setProgress(Investigating);
- // TODO (now, rangetxns): This should be a Routable, or we should guarantee it is safe to operate on any key in the range
- RoutingKey someKey = Route.isRoute(someKeys) ? (Route.castToRoute(someKeys)).homeKey() : someKeys.get(0).someIntersectingRoutingKey();
+ RoutingKey someKey = Route.isRoute(someKeys) ? (Route.castToRoute(someKeys)).homeKey() : someKeys.get(0).someIntersectingRoutingKey(null);
someKeys = someKeys.with(someKey);
debugInvestigating = Invalidate.invalidate(node, txnId, someKeys, (success, fail) -> {
commandStore.execute(PreLoadContext.empty(), ignore -> {
diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java
index 16c7a71..ef3709f 100644
--- a/accord-core/src/main/java/accord/local/Command.java
+++ b/accord-core/src/main/java/accord/local/Command.java
@@ -22,7 +22,6 @@
import accord.local.Status.Durability;
import accord.local.Status.Known;
import accord.primitives.*;
-import accord.primitives.Txn.Kind;
import accord.primitives.Writes;
import accord.utils.Invariants;
import org.apache.cassandra.utils.concurrent.Future;
@@ -103,6 +102,11 @@
* progressKey is a local value that defines the local shard responsible for ensuring progress on the transaction.
* This will be homeKey if it is owned by the node, and some other key otherwise. If not the home shard, the progress
* shard has much weaker responsibilities, only ensuring that the home shard has durably witnessed the txnId.
+ *
+ * TODO (expected, efficiency): we probably do not want to save this on its own, as we probably want to
+ * minimize IO interactions and discrete registers, so will likely reference commit log entries directly
+ * At which point we may impose a requirement that only a Route can be saved, not a homeKey on its own.
+ * Once this restriction is imposed, we no longer need to pass around Routable.Domain with TxnId.
*/
public abstract RoutingKey homeKey();
protected abstract void setHomeKey(RoutingKey key);
@@ -237,6 +241,7 @@
}
Ranges coordinateRanges = coordinateRanges(safeStore);
+ Invariants.checkState(!coordinateRanges.isEmpty());
ProgressShard shard = progressShard(safeStore, route, progressKey, coordinateRanges);
if (!validate(Ranges.EMPTY, coordinateRanges, shard, route, Set, partialTxn, Set, null, Ignore))
throw new IllegalStateException();
@@ -278,7 +283,7 @@
return true;
}
- public AcceptOutcome accept(SafeCommandStore safeStore, Ballot ballot, Kind kind, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey progressKey, Timestamp executeAt, PartialDeps partialDeps)
+ public AcceptOutcome accept(SafeCommandStore safeStore, Ballot ballot, PartialRoute<?> route, Seekables<?, ?> keys, @Nullable RoutingKey progressKey, Timestamp executeAt, PartialDeps partialDeps)
{
if (this.promised().compareTo(ballot) > 0)
{
@@ -292,33 +297,29 @@
return AcceptOutcome.Redundant;
}
- if (known().isDefinitionKnown() && txnId().rw() != kind)
- throw new IllegalArgumentException("Transaction kind is different to the definition we have already received");
-
TxnId txnId = txnId();
Ranges coordinateRanges = coordinateRanges(safeStore);
Ranges acceptRanges = txnId.epoch() == executeAt.epoch() ? coordinateRanges : safeStore.ranges().between(txnId.epoch(), executeAt.epoch());
+ Invariants.checkState(!acceptRanges.isEmpty());
ProgressShard shard = progressShard(safeStore, route, progressKey, coordinateRanges);
if (!validate(coordinateRanges, Ranges.EMPTY, shard, route, Ignore, null, Ignore, partialDeps, Set))
- {
- validate(coordinateRanges, Ranges.EMPTY, shard, route, Ignore, null, Ignore, partialDeps, Set);
throw new AssertionError("Invalid response from validate function");
- }
setExecuteAt(executeAt);
setPromised(ballot);
setAccepted(ballot);
- set(safeStore, coordinateRanges, Ranges.EMPTY, shard, route, null, Ignore, partialDeps, Set);
- switch (status())
- {
- // if we haven't already registered, do so, to correctly maintain max per-key timestamp
- case NotWitnessed:
- case AcceptedInvalidate:
- safeStore.forEach(keys, acceptRanges, forKey -> forKey.register(this));
- }
- setStatus(Accepted);
+ // TODO (desired, clarity/efficiency): we don't need to set the route here, and perhaps we don't even need to
+ // distributed partialDeps at all, since all we gain is not waiting for these transactions to commit during
+ // recovery. We probably don't want to directly persist a Route in any other circumstances, either, to ease persistence.
+ set(safeStore, coordinateRanges, acceptRanges, shard, route, null, Ignore, partialDeps, Set);
+
+ // set only registers by transaction keys, which we mightn't already have received
+ if (!known().isDefinitionKnown())
+ safeStore.register(keys, acceptRanges, this);
+
+ setStatus(Accepted);
safeStore.progressLog().accepted(this, shard);
notifyListeners(safeStore);
@@ -407,7 +408,8 @@
protected void populateWaitingOn(SafeCommandStore safeStore)
{
Ranges ranges = safeStore.ranges().since(executeAt().epoch());
- if (ranges != null) {
+ if (ranges != null)
+ {
partialDeps().forEach(ranges, txnId -> {
Command command = safeStore.ifLoaded(txnId);
if (command == null)
@@ -417,7 +419,8 @@
}
else
{
- switch (command.status()) {
+ switch (command.status())
+ {
default:
throw new IllegalStateException();
case NotWitnessed:
@@ -601,6 +604,7 @@
switch (status())
{
case Committed:
+ // TODO (desirable, efficiency): maintain distinct ReadyToRead and ReadyToWrite states
setStatus(ReadyToExecute);
logger.trace("{}: set to ReadyToExecute", txnId());
safeStore.progressLog().readyToExecute(this, shard);
@@ -608,7 +612,10 @@
break;
case PreApplied:
- if (executeRanges(safeStore, executeAt()).intersects(writes().keys))
+ Ranges executeRanges = executeRanges(safeStore, executeAt());
+ boolean intersects = writes().keys.intersects(executeRanges);
+
+ if (intersects)
{
logger.trace("{}: applying", txnId());
apply(safeStore);
@@ -920,48 +927,34 @@
return false;
// first validate route
- if (shard.isProgress())
+ if (shard.isHome())
{
- // validate route
- if (shard.isHome())
+ switch (ensureRoute)
{
- switch (ensureRoute)
- {
- default: throw new AssertionError();
- case Check:
- if (!isFullRoute(route()) && !isFullRoute(route))
- return false;
- case Ignore:
- break;
- case Add:
- case Set:
- if (!isFullRoute(route))
- throw new IllegalArgumentException("Incomplete route (" + route + ") sent to home shard");
- break;
- case TrySet:
- if (!isFullRoute(route))
- return false;
- }
+ default: throw new AssertionError();
+ case Check:
+ if (!isFullRoute(route()) && !isFullRoute(route))
+ return false;
+ case Ignore:
+ break;
+ case Add:
+ case Set:
+ if (!isFullRoute(route))
+ throw new IllegalArgumentException("Incomplete route (" + route + ") sent to home shard");
+ break;
+ case TrySet:
+ if (!isFullRoute(route))
+ return false;
}
- else if (route() == null)
- {
- // failing any of these tests is always an illegal state
- if (!route.covers(existingRanges))
- return false;
+ }
+ else
+ {
+ // failing any of these tests is always an illegal state
+ if (!route.covers(existingRanges))
+ return false;
- if (existingRanges != additionalRanges && !route.covers(additionalRanges))
- throw new IllegalArgumentException("Incomplete route (" + route + ") provided; does not cover " + additionalRanges);
- }
- else if (existingRanges != additionalRanges && !route().covers(additionalRanges))
- {
- if (!route.covers(additionalRanges))
- throw new IllegalArgumentException("Incomplete route (" + route + ") provided; does not cover " + additionalRanges);
- }
- else
- {
- if (!route().covers(existingRanges))
- throw new IllegalStateException();
- }
+ if (existingRanges != additionalRanges && !route.covers(additionalRanges))
+ throw new IllegalArgumentException("Incomplete route (" + route + ") provided; does not cover " + additionalRanges);
}
// invalid to Add deps to Accepted or AcceptedInvalidate statuses, as Committed deps are not equivalent
@@ -974,7 +967,7 @@
return false;
if (partialTxn != null && txnId().rw() != partialTxn.kind())
- throw new IllegalArgumentException("Transaction has different kind to the definition we previously received");
+ throw new IllegalArgumentException("Transaction has different kind to its TxnId");
if (shard.isHome() && ensurePartialTxn != Ignore)
{
@@ -991,10 +984,10 @@
@Nullable PartialDeps partialDeps, EnsureAction ensurePartialDeps)
{
Invariants.checkState(progressKey() != null);
- Ranges allRanges = existingRanges.union(additionalRanges);
+ Ranges allRanges = existingRanges.with(additionalRanges);
if (shard.isProgress()) setRoute(Route.merge(route(), (Route)route));
- else setRoute(Route.merge(route(), (Route)route.slice(allRanges)));
+ else setRoute(route.slice(allRanges));
switch (ensurePartialTxn)
{
@@ -1006,7 +999,8 @@
{
partialTxn = partialTxn.slice(allRanges, shard.isHome());
Routables.foldlMissing((Seekables)partialTxn.keys(), partialTxn().keys(), (keyOrRange, p, v, i) -> {
- safeStore.forEach(keyOrRange, allRanges, forKey -> forKey.register(this));
+ // TODO (expected, efficiency): we may register the same ranges more than once
+ safeStore.register(keyOrRange, allRanges, this);
return v;
}, 0, 0, 1);
this.setPartialTxn(partialTxn().with(partialTxn));
@@ -1017,10 +1011,8 @@
case TrySet:
setPartialTxn(partialTxn = partialTxn.slice(allRanges, shard.isHome()));
// TODO (expected, efficiency): we may register the same ranges more than once
- safeStore.forEach(partialTxn.keys(), allRanges, forKey -> {
- // TODO (desirable, efficiency): no need to register on PreAccept if already Accepted
- forKey.register(this);
- });
+ // TODO (desirable, efficiency): no need to register on PreAccept if already Accepted
+ safeStore.register(partialTxn.keys(), allRanges, this);
break;
}
@@ -1091,7 +1083,7 @@
}
else if (existing != null)
{
- Ranges covering = adding.union(existing);
+ Ranges covering = adding.with(existing);
Invariants.checkState(covering.containsAll(existingRanges));
if (existingRanges != additionalRanges && !covering.containsAll(additionalRanges))
{
@@ -1127,7 +1119,7 @@
return route();
if (homeKey() != null)
- return new PartialKeyRoute(Ranges.EMPTY, homeKey(), new RoutingKey[0]);
+ return PartialRoute.empty(txnId().domain(), homeKey());
return null;
}
@@ -1191,6 +1183,12 @@
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public Range asRange()
+ {
+ throw new UnsupportedOperationException();
+ }
}
private static final NoProgressKey NO_PROGRESS_KEY = new NoProgressKey();
diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java
index 92b2271..09ad0fc 100644
--- a/accord-core/src/main/java/accord/local/CommandStores.java
+++ b/accord-core/src/main/java/accord/local/CommandStores.java
@@ -160,7 +160,7 @@
Ranges result = ranges[i++];
while (i <= j)
- result = result.union(ranges[i++]);
+ result = result.with(ranges[i++]);
return result;
}
@@ -170,7 +170,7 @@
if (i < 0) i = Math.max(0, -2 -i);
Ranges result = ranges[i++];
while (i < ranges.length)
- result = ranges[i++].union(result);
+ result = ranges[i++].with(result);
return result;
}
diff --git a/accord-core/src/main/java/accord/local/CommandsForKey.java b/accord-core/src/main/java/accord/local/CommandsForKey.java
deleted file mode 100644
index 142440f..0000000
--- a/accord-core/src/main/java/accord/local/CommandsForKey.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package accord.local;
-
-import java.util.stream.Stream;
-
-import accord.api.Key;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-public abstract class CommandsForKey implements CommandListener
-{
- private static final Logger logger = LoggerFactory.getLogger(CommandsForKey.class);
-
- public interface CommandTimeseries<T>
- {
- void add(Timestamp timestamp, Command command);
- void remove(Timestamp timestamp);
-
- boolean isEmpty();
-
- /**
- * Test whether or not the dependencies of a command contain a given transaction id.
- * NOTE that this applies only to commands that have at least proposed dependencies;
- * if no dependencies are known the command will not be tested.
- */
- enum TestDep { WITH, WITHOUT, ANY_DEPS }
- enum TestStatus
- {
- IS, HAS_BEEN, ANY_STATUS;
- public static boolean test(Status test, TestStatus predicate, Status param)
- {
- return predicate == ANY_STATUS || (predicate == IS ? test == param : test.hasBeen(param));
- }
- }
- enum TestKind { Ws, RorWs}
-
- /**
- * All commands before (exclusive of) the given timestamp
- *
- * Note that {@code testDep} applies only to commands that know at least proposed deps; if specified any
- * commands that do not know any deps will be ignored.
- *
- * TODO (expected, efficiency): TestDep should be asynchronous; data should not be kept memory-resident as only used for recovery
- */
- Stream<T> before(Timestamp timestamp, TestKind testKind, TestDep testDep, @Nullable TxnId depId, TestStatus testStatus, @Nullable Status status);
-
- /**
- * All commands after (exclusive of) the given timestamp.
- *
- * Note that {@code testDep} applies only to commands that know at least proposed deps; if specified any
- * commands that do not know any deps will be ignored.
- */
- Stream<T> after(Timestamp timestamp, TestKind testKind, TestDep testDep, @Nullable TxnId depId, TestStatus testStatus, @Nullable Status status);
- }
-
- public static class TxnIdWithExecuteAt
- {
- public final TxnId txnId;
- public final Timestamp executeAt;
-
- public TxnIdWithExecuteAt(TxnId txnId, Timestamp executeAt)
- {
- this.txnId = txnId;
- this.executeAt = executeAt;
- }
- }
-
- public abstract Key key();
- public abstract CommandTimeseries<? extends TxnIdWithExecuteAt> uncommitted();
- public abstract CommandTimeseries<TxnId> committedById();
- public abstract CommandTimeseries<TxnId> committedByExecuteAt();
-
- public abstract Timestamp max();
- protected abstract void updateMax(Timestamp timestamp);
-
- @Override
- public PreLoadContext listenerPreLoadContext(TxnId caller)
- {
- return PreLoadContext.contextFor(caller, Keys.of(key()));
- }
-
- @Override
- public void onChange(SafeCommandStore safeStore, Command command)
- {
- logger.trace("[{}]: updating as listener in response to change on {} with status {} ({})",
- key(), command.txnId(), command.status(), command);
- updateMax(command.executeAt());
- switch (command.status())
- {
- default: throw new AssertionError();
- case PreAccepted:
- case NotWitnessed:
- case Accepted:
- case AcceptedInvalidate:
- case PreCommitted:
- break;
- case Applied:
- case PreApplied:
- case Committed:
- case ReadyToExecute:
- committedById().add(command.txnId(), command);
- committedByExecuteAt().add(command.executeAt(), command);
- case Invalidated:
- uncommitted().remove(command.txnId());
- command.removeListener(this);
- break;
- }
- }
-
- public void register(Command command)
- {
- updateMax(command.executeAt());
- uncommitted().add(command.txnId(), command);
- command.addListener(this);
- }
-
- public boolean isEmpty()
- {
- return uncommitted().isEmpty() && committedById().isEmpty();
- }
-}
diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java
index 15de0dc..4ccc05c 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -366,7 +366,7 @@
public Future<Result> coordinate(Txn txn)
{
- return coordinate(nextTxnId(txn.kind(), Key), txn);
+ return coordinate(nextTxnId(txn.kind(), txn.keys().domain()), txn);
}
public Future<Result> coordinate(TxnId txnId, Txn txn)
@@ -396,8 +396,9 @@
private @Nullable RoutingKey trySelectHomeKey(TxnId txnId, Seekables<?, ?> keysOrRanges)
{
- int i = (int)keysOrRanges.findNextIntersection(0, topology().localForEpoch(txnId.epoch()).ranges(), 0);
- return i >= 0 ? keysOrRanges.get(i).someIntersectingRoutingKey() : null;
+ Ranges owned = topology().localForEpoch(txnId.epoch()).ranges();
+ int i = (int)keysOrRanges.findNextIntersection(0, owned, 0);
+ return i >= 0 ? keysOrRanges.get(i).someIntersectingRoutingKey(owned) : null;
}
public RoutingKey selectProgressKey(TxnId txnId, Route<?> route, RoutingKey homeKey)
@@ -436,14 +437,14 @@
int i = (int)route.findNextIntersection(0, topology.ranges(), 0);
if (i < 0)
return null;
- return route.get(i).someIntersectingRoutingKey();
+ return route.get(i).someIntersectingRoutingKey(topology.ranges());
}
public RoutingKey selectRandomHomeKey(TxnId txnId)
{
Ranges ranges = topology().localForEpoch(txnId.epoch()).ranges();
Range range = ranges.get(ranges.size() == 1 ? 0 : random.nextInt(ranges.size()));
- return range.someIntersectingRoutingKey();
+ return range.someIntersectingRoutingKey(null);
}
static class RecoverFuture<T> extends AsyncFuture<T> implements BiConsumer<T, Throwable>
diff --git a/accord-core/src/main/java/accord/local/PreLoadContext.java b/accord-core/src/main/java/accord/local/PreLoadContext.java
index d00e1c4..8a56c9f 100644
--- a/accord-core/src/main/java/accord/local/PreLoadContext.java
+++ b/accord-core/src/main/java/accord/local/PreLoadContext.java
@@ -19,6 +19,7 @@
package accord.local;
import accord.api.Key;
+import accord.impl.CommandsForKey;
import accord.primitives.Keys;
import accord.primitives.Seekables;
import accord.primitives.TxnId;
@@ -33,11 +34,21 @@
{
/**
* @return ids of the {@link Command} objects that need to be loaded into memory before this operation is run
+ *
+ * TODO (expected): this is used for Apply, NotifyWaitingOn and listenerContexts; others only use a single txnId
+ * firstly, it would be nice to simply have that txnId as a single value.
+ * In the case of Apply, we can likely avoid loading all dependent transactions, if we can track which ranges
+ * out of memory have un-applied transactions (and try not to evict those that are not applied).
+ * Either way, the information we need in memory is super minimal for secondary transactions.
*/
Iterable<TxnId> txnIds();
/**
* @return keys of the {@link CommandsForKey} objects that need to be loaded into memory before this operation is run
+ *
+ * TODO (expected, efficiency): this used for only two things: calculateDeps and CommandStore.register.
+ * Both can be done without. For range transactions calculateDeps needs to be asynchronous anyway to support
+ * potentially large scans, and for register we do not need to load into memory, we can perform a blind write.
*/
Seekables<?, ?> keys();
@@ -55,7 +66,7 @@
static PreLoadContext contextFor(TxnId txnId, Seekables<?, ?> keysOrRanges)
{
- switch (keysOrRanges.kindOfContents())
+ switch (keysOrRanges.domain())
{
default: throw new AssertionError();
case Range: return contextFor(txnId); // TODO (required, correctness): this won't work for actual range queries
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 1dc118c..8f5b2f3 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -20,12 +20,11 @@
import accord.api.Agent;
import accord.api.DataStore;
-import accord.api.Key;
import accord.api.ProgressLog;
import accord.primitives.*;
import org.apache.cassandra.utils.concurrent.Future;
-import java.util.function.BinaryOperator;
+import javax.annotation.Nullable;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -48,18 +47,39 @@
Command ifLoaded(TxnId txnId);
Command command(TxnId txnId);
- CommandsForKey commandsForKey(Key key);
- CommandsForKey maybeCommandsForKey(Key key);
-
/**
* Register a listener against the given TxnId, then load the associated transaction and invoke the listener
* with its current state.
*/
void addAndInvokeListener(TxnId txnId, CommandListener listener);
- <T> T mapReduce(Routables<?, ?> keys, Ranges slice, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue);
- void forEach(Routables<?, ?> keys, Ranges slice, Consumer<CommandsForKey> forEach);
- void forEach(Routable keyOrRange, Ranges slice, Consumer<CommandsForKey> forEach);
+ interface CommandFunction<I, O>
+ {
+ O apply(Seekable keyOrRange, TxnId txnId, Timestamp executeAt, I in);
+ }
+
+ enum TestTimestamp
+ {
+ STARTED_BEFORE,
+ STARTED_AFTER,
+ MAY_EXECUTE_BEFORE, // started before and uncommitted, or committed and executes before
+ EXECUTES_AFTER
+ }
+ enum TestDep { WITH, WITHOUT, ANY_DEPS }
+ enum TestKind { Ws, RorWs }
+
+ /**
+ * Visits keys first and then ranges, both in ascending order.
+ * Within each key or range visits TxnId in ascending order of queried timestamp.
+ */
+ <T> T mapReduce(Seekables<?, ?> keys, Ranges slice,
+ TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp,
+ TestDep testDep, @Nullable TxnId depId,
+ @Nullable Status minStatus, @Nullable Status maxStatus,
+ CommandFunction<T, T> map, T initialValue, T terminalValue);
+
+ void register(Seekables<?, ?> keysOrRanges, Ranges slice, Command command);
+ void register(Seekable keyOrRange, Ranges slice, Command command);
CommandStore commandStore();
DataStore dataStore();
diff --git a/accord-core/src/main/java/accord/local/SaveStatus.java b/accord-core/src/main/java/accord/local/SaveStatus.java
index 3215d1a..7e83068 100644
--- a/accord-core/src/main/java/accord/local/SaveStatus.java
+++ b/accord-core/src/main/java/accord/local/SaveStatus.java
@@ -84,20 +84,20 @@
case PreAccepted: return PreAccepted;
case AcceptedInvalidate:
// AcceptedInvalidate logically clears any proposed deps and executeAt
- if (!known.executeAt.isDecisionKnown())
+ if (!known.executeAt.hasDecidedExecuteAt())
return known.isDefinitionKnown() ? AcceptedInvalidateWithDefinition : AcceptedInvalidate;
// If we know the executeAt decision then we do not clear it, and fall-through to PreCommitted
// however, we still clear the deps, as any deps we might have previously seen proposed are now expired
// TODO (expected, consider): consider clearing Command.partialDeps in this case also
known = known.with(DepsUnknown);
case Accepted:
- if (!known.executeAt.isDecisionKnown())
+ if (!known.executeAt.hasDecidedExecuteAt())
return known.isDefinitionKnown() ? AcceptedWithDefinition : Accepted;
// if the decision is known, we're really PreCommitted
case PreCommitted:
if (known.isDefinitionKnown())
- return known.deps.isProposalKnown() ? PreCommittedWithDefinitionAndAcceptedDeps : PreCommittedWithDefinition;
- return known.deps.isProposalKnown() ? PreCommittedWithAcceptedDeps : PreCommitted;
+ return known.deps.hasProposedOrDecidedDeps() ? PreCommittedWithDefinitionAndAcceptedDeps : PreCommittedWithDefinition;
+ return known.deps.hasProposedOrDecidedDeps() ? PreCommittedWithAcceptedDeps : PreCommitted;
case Committed: return Committed;
case ReadyToExecute: return ReadyToExecute;
case PreApplied: return PreApplied;
diff --git a/accord-core/src/main/java/accord/local/Status.java b/accord-core/src/main/java/accord/local/Status.java
index 494bcc8..158bb89 100644
--- a/accord-core/src/main/java/accord/local/Status.java
+++ b/accord-core/src/main/java/accord/local/Status.java
@@ -20,9 +20,6 @@
import accord.messages.BeginRecovery;
import accord.primitives.Ballot;
-import accord.primitives.Ranges;
-import accord.primitives.Seekables;
-import accord.primitives.TxnId;
import com.google.common.base.Preconditions;
import java.util.List;
@@ -172,20 +169,20 @@
case OutcomeApplied:
case OutcomeKnown:
- if (executeAt.isDecisionKnown() && definition.isKnown() && deps.isDecisionKnown())
+ if (executeAt.hasDecidedExecuteAt() && definition.isKnown() && deps.hasDecidedDeps())
return PreApplied;
case OutcomeUnknown:
- if (executeAt.isDecisionKnown() && definition.isKnown() && deps.isDecisionKnown())
+ if (executeAt.hasDecidedExecuteAt() && definition.isKnown() && deps.hasDecidedDeps())
return Committed;
- if (executeAt.isDecisionKnown())
+ if (executeAt.hasDecidedExecuteAt())
return PreCommitted;
if (definition.isKnown())
return PreAccepted;
- if (deps.isDecisionKnown())
+ if (deps.hasDecidedDeps())
throw new IllegalStateException();
}
@@ -199,9 +196,9 @@
public boolean isDecisionKnown()
{
- if (!deps.isDecisionKnown())
+ if (!deps.hasDecidedDeps())
return false;
- Preconditions.checkState(executeAt.isDecisionKnown());
+ Preconditions.checkState(executeAt.hasDecidedExecuteAt());
return true;
}
}
@@ -229,7 +226,7 @@
NoExecuteAt,
;
- public boolean isDecisionKnown()
+ public boolean hasDecidedExecuteAt()
{
return compareTo(ExecuteAtKnown) >= 0;
}
@@ -260,14 +257,19 @@
NoDeps,
;
- public boolean isDecisionKnown()
+ public boolean hasDecidedDeps()
+ {
+ return this == DepsKnown;
+ }
+
+ public boolean isDecided()
{
return compareTo(DepsKnown) >= 0;
}
- public boolean isProposalKnown()
+ public boolean hasProposedOrDecidedDeps()
{
- return compareTo(DepsProposed) >= 0;
+ return this == DepsProposed || this == DepsKnown;
}
}
diff --git a/accord-core/src/main/java/accord/messages/Accept.java b/accord-core/src/main/java/accord/messages/Accept.java
index bb9cffd..f155ffe 100644
--- a/accord-core/src/main/java/accord/messages/Accept.java
+++ b/accord-core/src/main/java/accord/messages/Accept.java
@@ -27,7 +27,6 @@
import accord.local.Command.AcceptOutcome;
import accord.primitives.PartialDeps;
import accord.primitives.FullRoute;
-import accord.primitives.Txn;
import accord.primitives.Ballot;
import accord.local.Command;
@@ -39,7 +38,6 @@
import javax.annotation.Nullable;
import static accord.local.Command.AcceptOutcome.*;
-import static accord.messages.PreAccept.calculatePartialDeps;
// TODO (low priority, efficiency): use different objects for send and receive, so can be more efficient
// (e.g. serialize without slicing, and without unnecessary fields)
@@ -47,9 +45,9 @@
{
public static class SerializerSupport
{
- public static Accept create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey, Ballot ballot, Timestamp executeAt, Seekables<?, ?> keys, PartialDeps partialDeps, Txn.Kind kind)
+ public static Accept create(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey, Ballot ballot, Timestamp executeAt, Seekables<?, ?> keys, PartialDeps partialDeps)
{
- return new Accept(txnId, scope, waitForEpoch, minEpoch, doNotComputeProgressKey, ballot, executeAt, keys, partialDeps, kind);
+ return new Accept(txnId, scope, waitForEpoch, minEpoch, doNotComputeProgressKey, ballot, executeAt, keys, partialDeps);
}
}
@@ -57,33 +55,40 @@
public final Timestamp executeAt;
public final Seekables<?, ?> keys;
public final PartialDeps partialDeps;
- public final Txn.Kind kind;
- public Accept(Id to, Topologies topologies, Ballot ballot, TxnId txnId, FullRoute<?> route, Timestamp executeAt, Seekables<?, ?> keys, Deps deps, Txn.Kind kind)
+ public Accept(Id to, Topologies topologies, Ballot ballot, TxnId txnId, FullRoute<?> route, Timestamp executeAt, Seekables<?, ?> keys, Deps deps)
{
super(to, topologies, txnId, route);
this.ballot = ballot;
this.executeAt = executeAt;
this.keys = keys.slice(scope.covering());
this.partialDeps = deps.slice(scope.covering());
- this.kind = kind;
}
- private Accept(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey, Ballot ballot, Timestamp executeAt, Seekables<?, ?> keys, PartialDeps partialDeps, Txn.Kind kind)
+ private Accept(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey, Ballot ballot, Timestamp executeAt, Seekables<?, ?> keys, PartialDeps partialDeps)
{
super(txnId, scope, waitForEpoch, minEpoch, doNotComputeProgressKey);
this.ballot = ballot;
this.executeAt = executeAt;
this.keys = keys;
this.partialDeps = partialDeps;
- this.kind = kind;
}
@Override
public synchronized AcceptReply apply(SafeCommandStore safeStore)
{
+ if (minUnsyncedEpoch < txnId.epoch())
+ {
+ // if we include unsync'd epochs, check if we intersect the ranges for coordination or execution;
+ // if not, we're just providing dependencies, and we can do that without updating our state
+ Ranges acceptRanges = safeStore.ranges().between(txnId.epoch(), executeAt.epoch());
+ if (!acceptRanges.intersects(scope))
+ return new AcceptReply(calculatePartialDeps(safeStore));
+ }
+
+ // only accept if we actually participate in the ranges - otherwise we're just looking
Command command = safeStore.command(txnId);
- switch (command.accept(safeStore, ballot, kind, scope, keys, progressKey, executeAt, partialDeps))
+ switch (command.accept(safeStore, ballot, scope, keys, progressKey, executeAt, partialDeps))
{
default: throw new IllegalStateException();
case Redundant:
@@ -92,10 +97,15 @@
return new AcceptReply(command.promised());
case Success:
// TODO (desirable, efficiency): we don't need to calculate deps if executeAt == txnId
- return new AcceptReply(calculatePartialDeps(safeStore, txnId, keys, kind, executeAt, safeStore.ranges().between(minEpoch, executeAt.epoch())));
+ return new AcceptReply(calculatePartialDeps(safeStore));
}
}
+ private PartialDeps calculatePartialDeps(SafeCommandStore safeStore)
+ {
+ return PreAccept.calculatePartialDeps(safeStore, txnId, keys, executeAt, safeStore.ranges().between(minUnsyncedEpoch, executeAt.epoch()));
+ }
+
@Override
public AcceptReply reduce(AcceptReply ok1, AcceptReply ok2)
{
@@ -117,7 +127,7 @@
@Override
public void process()
{
- node.mapReduceConsumeLocal(this, minEpoch, executeAt.epoch(), this);
+ node.mapReduceConsumeLocal(this, minUnsyncedEpoch, executeAt.epoch(), this);
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index 2d4bd1d..3d699c8 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -19,14 +19,12 @@
package accord.messages;
import accord.api.Result;
-import accord.local.CommandsForKey.TxnIdWithExecuteAt;
import accord.local.SafeCommandStore;
import accord.local.Status.Phase;
import accord.primitives.*;
import accord.topology.Topologies;
import java.util.List;
-import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -39,10 +37,10 @@
import java.util.Collections;
-import static accord.local.CommandsForKey.CommandTimeseries.TestDep.WITH;
-import static accord.local.CommandsForKey.CommandTimeseries.TestDep.WITHOUT;
-import static accord.local.CommandsForKey.CommandTimeseries.TestKind.RorWs;
-import static accord.local.CommandsForKey.CommandTimeseries.TestStatus.*;
+import static accord.local.SafeCommandStore.TestDep.WITH;
+import static accord.local.SafeCommandStore.TestDep.WITHOUT;
+import static accord.local.SafeCommandStore.TestKind.RorWs;
+import static accord.local.SafeCommandStore.TestTimestamp.*;
import static accord.local.Status.*;
import static accord.messages.PreAccept.calculatePartialDeps;
@@ -103,9 +101,9 @@
}
PartialDeps deps = command.partialDeps();
- if (!command.known().deps.isProposalKnown())
+ if (!command.known().deps.hasProposedOrDecidedDeps())
{
- deps = calculatePartialDeps(safeStore, txnId, partialTxn.keys(), partialTxn.kind(), txnId, safeStore.ranges().at(txnId.epoch()));
+ deps = calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, safeStore.ranges().at(txnId.epoch()));
}
boolean rejectsFastPath;
@@ -119,9 +117,9 @@
else
{
Ranges ranges = safeStore.ranges().at(txnId.epoch());
- rejectsFastPath = acceptedStartedAfterWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys()).anyMatch(ignore -> true);
+ rejectsFastPath = hasAcceptedStartedAfterWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys());
if (!rejectsFastPath)
- rejectsFastPath = committedExecutesAfterWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys()).anyMatch(ignore -> true);
+ rejectsFastPath = hasCommittedExecutesAfterWithoutWitnessing(safeStore, txnId, ranges, partialTxn.keys());
// TODO (expected, testing): introduce some good unit tests for verifying these two functions in a real repair scenario
// committed txns with an earlier txnid and have our txnid as a dependency
@@ -145,7 +143,7 @@
RecoverOk ok2 = (RecoverOk) r2;
// set ok1 to the most recent of the two
- if (ok1.status.compareTo(ok2.status) < 0 || (ok1.status == ok2.status && ok1.accepted.compareTo(ok2.accepted) < 0))
+ if (ok1 != Status.max(ok1, ok1.status, ok1.accepted, ok2, ok2.status, ok2.accepted))
{
RecoverOk tmp = ok1;
ok1 = ok2;
@@ -160,7 +158,7 @@
Timestamp timestamp = ok1.status == PreAccepted ? Timestamp.max(ok1.executeAt, ok2.executeAt) : ok1.executeAt;
return new RecoverOk(
- txnId, ok1.status, Ballot.max(ok1.accepted, ok2.accepted), timestamp, deps,
+ txnId, ok1.status, ok1.accepted, timestamp, deps,
earlierCommittedWitness, earlierAcceptedNoWitness,
ok1.rejectsFastPath | ok2.rejectsFastPath,
ok1.writes, ok1.result);
@@ -295,64 +293,44 @@
}
}
- private static Deps acceptedStartedBeforeWithoutWitnessing(SafeCommandStore commandStore, TxnId txnId, Ranges ranges, Seekables<?, ?> keys)
+ private static Deps acceptedStartedBeforeWithoutWitnessing(SafeCommandStore commandStore, TxnId startedBefore, Ranges ranges, Seekables<?, ?> keys)
{
try (Deps.Builder builder = Deps.builder())
{
- commandStore.forEach(keys, ranges, forKey -> {
- // accepted txns with an earlier txnid that do not have our txnid as a dependency
- /*
- * The idea here is to discover those transactions that have been Accepted without witnessing us
- * and whom may not have adopted us as dependencies as responses to the Accept. Once we have
- * reached a quorum for recovery any re-proposals will discover us. So we do not need to look
- * at AcceptedInvalidate, as these will either be invalidated OR will witness us when they propose
- * to Accept.
- *
- * Note that we treat PreCommitted as whatever status it held previously.
- * Which is to say, we expect the previously proposed dependencies (if any) to be used to evaluate this
- * condition.
- */
- forKey.uncommitted().before(txnId, RorWs, WITHOUT, txnId, HAS_BEEN, Accepted).forEach((command) -> {
- if (command.executeAt.compareTo(txnId) > 0)
- builder.add(forKey.key(), command.txnId);
- });
- });
+ commandStore.mapReduce(keys, ranges, RorWs, STARTED_BEFORE, startedBefore, WITHOUT, startedBefore, Accepted, PreCommitted,
+ (keyOrRange, txnId, executeAt, prev) -> {
+ if (executeAt.compareTo(startedBefore) > 0)
+ builder.add(keyOrRange, txnId);
+ return builder;
+ }, builder, null);
return builder.build();
}
}
- private static Deps committedStartedBeforeAndWitnessed(SafeCommandStore commandStore, TxnId txnId, Ranges ranges, Seekables<?, ?> keys)
+ private static Deps committedStartedBeforeAndWitnessed(SafeCommandStore commandStore, TxnId startedBefore, Ranges ranges, Seekables<?, ?> keys)
{
try (Deps.Builder builder = Deps.builder())
{
- commandStore.forEach(keys, ranges, forKey -> {
- /*
- * The idea here is to discover those transactions that have been Committed and DID witness us
- * so that we can remove these from the set of acceptedStartedBeforeAndDidNotWitness
- * on other nodes, to minimise the number of transactions we try to wait for on recovery
- */
- forKey.committedById().before(txnId, RorWs, WITH, txnId, HAS_BEEN, Committed)
- .forEach(id -> builder.add(forKey.key(), id));
- });
+ commandStore.mapReduce(keys, ranges, RorWs, STARTED_BEFORE, startedBefore, WITH, startedBefore, Committed, null,
+ (keyOrRange, txnId, executeAt, prev) -> builder.add(keyOrRange, txnId), (Deps.AbstractBuilder<Deps>)builder, null);
return builder.build();
}
}
- private static Stream<? extends TxnIdWithExecuteAt> acceptedStartedAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables<?, ?> keys)
+ private static boolean hasAcceptedStartedAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables<?, ?> keys)
{
/*
* The idea here is to discover those transactions that were started after us and have been Accepted
* and did not witness us as part of their pre-accept round, as this means that we CANNOT have taken
* the fast path. This is central to safe recovery, as if every transaction that executes later has
* witnessed us we are safe to propose the pre-accept timestamp regardless, whereas if any transaction
- * has not witnessed us we can safely invalidate it.
+ * has not witnessed us we can safely invalidate (us).
*/
- return commandStore.mapReduce(keys, ranges, forKey ->
- forKey.uncommitted().after(startedAfter, RorWs, WITHOUT, startedAfter, HAS_BEEN, Accepted)
- , Stream::concat, Stream.empty());
+ return commandStore.mapReduce(keys, ranges, RorWs, STARTED_AFTER, startedAfter, WITHOUT, startedAfter, Accepted, PreCommitted,
+ (keyOrRange, txnId, executeAt, prev) -> true, false, true);
}
- private static Stream<TxnId> committedExecutesAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables<?, ?> keys)
+ private static boolean hasCommittedExecutesAfterWithoutWitnessing(SafeCommandStore commandStore, TxnId startedAfter, Ranges ranges, Seekables<?, ?> keys)
{
/*
* The idea here is to discover those transactions that have been decided to execute after us
@@ -361,7 +339,7 @@
* witnessed us we are safe to propose the pre-accept timestamp regardless, whereas if any transaction
* has not witnessed us we can safely invalidate it.
*/
- return commandStore.mapReduce(keys, ranges, forKey -> forKey.committedByExecuteAt().after(startedAfter, RorWs, WITHOUT, startedAfter, HAS_BEEN, Committed),
- Stream::concat, Stream.empty());
+ return commandStore.mapReduce(keys, ranges, RorWs, EXECUTES_AFTER, startedAfter, WITHOUT, startedAfter, Committed, null,
+ (keyOrRange, txnId, executeAt, prev) -> true,false, true);
}
}
diff --git a/accord-core/src/main/java/accord/messages/CheckStatus.java b/accord-core/src/main/java/accord/messages/CheckStatus.java
index 0399e1e..5f03522 100644
--- a/accord-core/src/main/java/accord/messages/CheckStatus.java
+++ b/accord-core/src/main/java/accord/messages/CheckStatus.java
@@ -94,7 +94,7 @@
public CheckStatus(Id to, Topologies topologies, TxnId txnId, Unseekables<?, ?> query, IncludeInfo includeInfo)
{
super(txnId);
- this.query = computeScope(to, topologies, (Unseekables) query, 0, Unseekables::slice, Unseekables::union);
+ this.query = computeScope(to, topologies, (Unseekables) query, 0, Unseekables::slice, Unseekables::with);
this.startEpoch = topologies.oldestEpoch();
this.endEpoch = topologies.currentEpoch();
this.includeInfo = includeInfo;
diff --git a/accord-core/src/main/java/accord/messages/Commit.java b/accord-core/src/main/java/accord/messages/Commit.java
index a498cd7..d053862 100644
--- a/accord-core/src/main/java/accord/messages/Commit.java
+++ b/accord-core/src/main/java/accord/messages/Commit.java
@@ -238,7 +238,7 @@
{
this.txnId = txnId;
int latestRelevantIndex = latestRelevantEpochIndex(to, topologies, scope);
- this.scope = computeScope(to, topologies, (Unseekables)scope, latestRelevantIndex, Unseekables::slice, Unseekables::union);
+ this.scope = computeScope(to, topologies, (Unseekables)scope, latestRelevantIndex, Unseekables::slice, Unseekables::with);
this.waitForEpoch = computeWaitForEpoch(to, topologies, latestRelevantIndex);
this.invalidateUntilEpoch = topologies.currentEpoch();
}
diff --git a/accord-core/src/main/java/accord/messages/GetDeps.java b/accord-core/src/main/java/accord/messages/GetDeps.java
index 13e9bc4..51ee9e9 100644
--- a/accord-core/src/main/java/accord/messages/GetDeps.java
+++ b/accord-core/src/main/java/accord/messages/GetDeps.java
@@ -45,14 +45,14 @@
@Override
public void process()
{
- node.mapReduceConsumeLocal(this, minEpoch, executeAt.epoch(), this);
+ node.mapReduceConsumeLocal(this, minUnsyncedEpoch, executeAt.epoch(), this);
}
@Override
public PartialDeps apply(SafeCommandStore instance)
{
- Ranges ranges = instance.ranges().between(minEpoch, executeAt.epoch());
- return calculatePartialDeps(instance, txnId, keys, kind, executeAt, ranges);
+ Ranges ranges = instance.ranges().between(minUnsyncedEpoch, executeAt.epoch());
+ return calculatePartialDeps(instance, txnId, keys, executeAt, ranges);
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/InformDurable.java b/accord-core/src/main/java/accord/messages/InformDurable.java
index 20066e4..ba5c91b 100644
--- a/accord-core/src/main/java/accord/messages/InformDurable.java
+++ b/accord-core/src/main/java/accord/messages/InformDurable.java
@@ -86,7 +86,13 @@
@Override
public void accept(Reply reply, Throwable failure)
{
- if (reply == null) throw new IllegalStateException();
+ // TODO: respond with failure
+ if (reply == null)
+ {
+ if (failure == null)
+ throw new IllegalStateException("Processed nothing on this node");
+ throw new IllegalStateException(failure);
+ }
node.reply(replyTo, replyContext, reply);
}
diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java b/accord-core/src/main/java/accord/messages/PreAccept.java
index bb90b40..9b171f0 100644
--- a/accord-core/src/main/java/accord/messages/PreAccept.java
+++ b/accord-core/src/main/java/accord/messages/PreAccept.java
@@ -22,7 +22,7 @@
import java.util.Objects;
import accord.local.*;
-import accord.local.CommandsForKey.CommandTimeseries.TestKind;
+import accord.local.SafeCommandStore.TestKind;
import accord.local.Node.Id;
import accord.messages.TxnRequest.WithUnsynced;
@@ -32,13 +32,12 @@
import accord.primitives.*;
-import accord.primitives.Deps;
import accord.primitives.TxnId;
-import static accord.local.CommandsForKey.CommandTimeseries.TestDep.ANY_DEPS;
-import static accord.local.CommandsForKey.CommandTimeseries.TestKind.RorWs;
-import static accord.local.CommandsForKey.CommandTimeseries.TestKind.Ws;
-import static accord.local.CommandsForKey.CommandTimeseries.TestStatus.ANY_STATUS;
+import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
+import static accord.local.SafeCommandStore.TestKind.RorWs;
+import static accord.local.SafeCommandStore.TestKind.Ws;
+import static accord.local.SafeCommandStore.TestTimestamp.STARTED_BEFORE;
public class PreAccept extends WithUnsynced<PreAccept.PreAcceptReply>
{
@@ -85,7 +84,7 @@
@Override
protected void process()
{
- node.mapReduceConsumeLocal(this, minEpoch, maxEpoch, this);
+ node.mapReduceConsumeLocal(this, minUnsyncedEpoch, maxEpoch, this);
}
@Override
@@ -94,13 +93,21 @@
// note: this diverges from the paper, in that instead of waiting for JoinShard,
// we PreAccept to both old and new topologies and require quorums in both.
// This necessitates sending to ALL replicas of old topology, not only electorate (as fast path may be unreachable).
+ if (minUnsyncedEpoch < txnId.epoch() && !safeStore.ranges().at(txnId.epoch()).intersects(scope))
+ {
+ // we only preaccept in the coordination epoch, but we might contact other epochs for dependencies
+ return new PreAcceptOk(txnId, txnId,
+ calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, safeStore.ranges().between(minUnsyncedEpoch, txnId.epoch())));
+ }
+
Command command = safeStore.command(txnId);
switch (command.preaccept(safeStore, partialTxn, route != null ? route : scope, progressKey))
{
default:
case Success:
case Redundant:
- return new PreAcceptOk(txnId, command.executeAt(), calculatePartialDeps(safeStore, txnId, partialTxn.keys(), partialTxn.kind(), txnId, safeStore.ranges().between(minEpoch, txnId.epoch())));
+ return new PreAcceptOk(txnId, command.executeAt(),
+ calculatePartialDeps(safeStore, txnId, partialTxn.keys(), txnId, safeStore.ranges().between(minUnsyncedEpoch, txnId.epoch())));
case RejectedBallot:
return PreAcceptNack.INSTANCE;
@@ -209,28 +216,26 @@
}
}
- static PartialDeps calculatePartialDeps(SafeCommandStore commandStore, TxnId txnId, Seekables<?, ?> keys, Txn.Kind kindOfTxn, Timestamp executeAt, Ranges ranges)
+ static PartialDeps calculatePartialDeps(SafeCommandStore commandStore, TxnId txnId, Seekables<?, ?> keys, Timestamp executeAt, Ranges ranges)
{
try (PartialDeps.Builder builder = PartialDeps.builder(ranges))
{
- return calculateDeps(commandStore, txnId, keys, kindOfTxn, executeAt, ranges, builder);
+ return calculateDeps(commandStore, txnId, keys, executeAt, ranges, builder);
}
}
- private static <T extends Deps> T calculateDeps(SafeCommandStore commandStore, TxnId txnId, Seekables<?, ?> keys, Txn.Kind kindOfTxn, Timestamp executeAt, Ranges ranges, Deps.AbstractBuilder<T> builder)
+ private static <T extends Deps> T calculateDeps(SafeCommandStore commandStore, TxnId txnId, Seekables<?, ?> keys, Timestamp executeAt, Ranges ranges, Deps.AbstractBuilder<T> builder)
{
- TestKind testKind = kindOfTxn.isWrite() ? RorWs : Ws;
- commandStore.forEach(keys, ranges, forKey -> {
- forKey.uncommitted().before(executeAt, testKind, ANY_DEPS, null, ANY_STATUS, null)
- .forEach(info -> {
- if (!info.txnId.equals(txnId)) builder.add(forKey.key(), info.txnId);
- });
- forKey.committedByExecuteAt().before(executeAt, testKind, ANY_DEPS, null, ANY_STATUS, null)
- .forEach(id -> {
- if (!id.equals(txnId)) builder.add(forKey.key(), id);
- });
- });
-
+ TestKind testKind = txnId.rw().isWrite() ? RorWs : Ws;
+ // could use MAY_EXECUTE_BEFORE to prune those we know execute later, but shouldn't usually be of much help
+ // and would need to supply !hasOrderedTxnId
+ commandStore.mapReduce(keys, ranges, testKind, STARTED_BEFORE, executeAt, ANY_DEPS, null, null, null,
+ (keyOrRange, testTxnId, testExecuteAt, in) -> {
+ // TODO (easy, efficiency): either pass txnId as parameter or encode this behaviour in a specialised builder to avoid extra allocations
+ if (testTxnId != txnId)
+ in.add(keyOrRange, testTxnId);
+ return in;
+ }, builder, null);
return builder.build();
}
diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java
index fee7e27..3da1771 100644
--- a/accord-core/src/main/java/accord/messages/ReadData.java
+++ b/accord-core/src/main/java/accord/messages/ReadData.java
@@ -67,7 +67,7 @@
super(txnId);
this.executeAtEpoch = executeAt.epoch();
int startIndex = latestRelevantEpochIndex(to, topologies, readScope);
- this.readScope = computeScope(to, topologies, (Seekables)readScope, startIndex, Seekables::slice, Seekables::union);
+ this.readScope = computeScope(to, topologies, (Seekables)readScope, startIndex, Seekables::slice, Seekables::with);
this.waitForEpoch = computeWaitForEpoch(to, topologies, startIndex);
}
@@ -256,7 +256,7 @@
@Override
public Seekables<?, ?> keys()
{
- return readScope;
+ return Keys.EMPTY;
}
@Override
diff --git a/accord-core/src/main/java/accord/messages/TxnRequest.java b/accord-core/src/main/java/accord/messages/TxnRequest.java
index 5b9b928..ab2a6ca 100644
--- a/accord-core/src/main/java/accord/messages/TxnRequest.java
+++ b/accord-core/src/main/java/accord/messages/TxnRequest.java
@@ -42,7 +42,7 @@
{
public static abstract class WithUnsynced<R> extends TxnRequest<R>
{
- public final long minEpoch; // TODO (low priority, clarity): can this just always be TxnId.epoch?
+ public final long minUnsyncedEpoch; // TODO (low priority, clarity): can this just always be TxnId.epoch?
public final boolean doNotComputeProgressKey;
public WithUnsynced(Id to, Topologies topologies, TxnId txnId, FullRoute<?> route)
@@ -53,7 +53,7 @@
private WithUnsynced(Id to, Topologies topologies, TxnId txnId, FullRoute<?> route, int startIndex)
{
super(to, topologies, route, txnId, startIndex);
- this.minEpoch = topologies.oldestEpoch();
+ this.minUnsyncedEpoch = topologies.oldestEpoch();
this.doNotComputeProgressKey = doNotComputeProgressKey(topologies, startIndex, txnId, waitForEpoch());
Ranges ranges = topologies.forEpoch(txnId.epoch()).rangesForNode(to);
@@ -63,7 +63,6 @@
}
else if (Invariants.isParanoid())
{
- boolean intersects = route.intersects(ranges);
long progressEpoch = Math.min(waitForEpoch(), txnId.epoch());
Ranges computesRangesOn = topologies.forEpoch(progressEpoch).rangesForNode(to);
if (computesRangesOn == null)
@@ -73,10 +72,10 @@
}
}
- protected WithUnsynced(TxnId txnId, PartialRoute scope, long waitForEpoch, long minEpoch, boolean doNotComputeProgressKey)
+ protected WithUnsynced(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minUnsyncedEpoch, boolean doNotComputeProgressKey)
{
super(txnId, scope, waitForEpoch);
- this.minEpoch = minEpoch;
+ this.minUnsyncedEpoch = minUnsyncedEpoch;
this.doNotComputeProgressKey = doNotComputeProgressKey;
}
@@ -235,14 +234,6 @@
// TODO (low priority, clarity): move to Topologies
public static <I, O> O computeScope(Node.Id node, Topologies topologies, I keys, int startIndex, BiFunction<I, Ranges, O> slice, BiFunction<O, O, O> merge)
{
- O scope = computeScopeInternal(node, topologies, keys, startIndex, slice, merge);
- if (scope == null)
- throw new IllegalArgumentException("No intersection");
- return scope;
- }
-
- private static <I, O> O computeScopeInternal(Node.Id node, Topologies topologies, I keys, int startIndex, BiFunction<I, Ranges, O> slice, BiFunction<O, O, O> merge)
- {
Ranges last = null;
O scope = null;
for (int i = startIndex, mi = topologies.size() ; i < mi ; ++i)
diff --git a/accord-core/src/main/java/accord/primitives/AbstractKeys.java b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
index 6fc570a..1cb1a3f 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractKeys.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
@@ -53,7 +53,7 @@
}
@Override
- public final Routable.Domain kindOfContents()
+ public final Routable.Domain domain()
{
return Key;
}
@@ -83,14 +83,21 @@
}
@Override
- public final boolean intersects(AbstractRanges<?> ranges)
+ public final boolean intersects(AbstractKeys<?, ?> keys)
{
- return ranges.intersects(this);
+ return findNextIntersection(0, keys, 0) >= 0;
}
- public final int findNext(K key, int startIndex)
+ @Override
+ public final boolean intersects(AbstractRanges<?> ranges)
{
- return SortedArrays.exponentialSearch(keys, startIndex, keys.length, key);
+ return findNextIntersection(0, ranges, 0) >= 0;
+ }
+
+ @Override
+ public final int findNext(int thisIndex, RoutableKey key, SortedArrays.Search search)
+ {
+ return SortedArrays.exponentialSearch(keys, thisIndex, keys.length, key, RoutableKey::compareTo, search);
}
@Override
@@ -100,12 +107,6 @@
}
@Override
- public final int findNext(int thisIndex, K find, SortedArrays.Search search)
- {
- return SortedArrays.exponentialSearch(keys, thisIndex, size(), find, RoutableKey::compareTo, search);
- }
-
- @Override
public final long findNextIntersection(int thisIdx, AbstractRanges<?> that, int thatIdx)
{
return SortedArrays.findNextIntersectionWithMultipleMatches(this.keys, thisIdx, that.ranges, thatIdx, (RoutableKey k, Range r) -> -r.compareTo(k), Range::compareTo);
@@ -154,6 +155,8 @@
return stream().map(Object::toString).collect(Collectors.joining(",", "[", "]"));
}
+
+
// TODO (expected, efficiency): accept cached buffers
protected K[] slice(Ranges ranges, IntFunction<K[]> factory)
{
diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
index 740233b..0fa9185 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
@@ -30,7 +30,6 @@
import java.util.function.Function;
import static accord.utils.ArrayBuffers.cachedRanges;
-import static accord.utils.SortedArrays.Search.CEIL;
import static accord.utils.SortedArrays.Search.FAST;
import static accord.utils.SortedArrays.swapHighLow32b;
@@ -42,6 +41,7 @@
AbstractRanges(@Nonnull Range[] ranges)
{
+ // TODO (simple, validation): check ranges are non-overlapping (or make sure it's safe for all methods that they aren't)
this.ranges = Invariants.nonNull(ranges);
}
@@ -65,7 +65,7 @@
@Override
public boolean containsAll(Routables<?, ?> that)
{
- switch (that.kindOfContents())
+ switch (that.domain())
{
default: throw new AssertionError();
case Key: return containsAll((AbstractKeys<?, ?>) that);
@@ -93,6 +93,26 @@
return ((int) supersetLinearMerge(this.ranges, that.ranges)) == that.size();
}
+ public boolean intersectsAll(Routables<?, ?> that)
+ {
+ switch (that.domain())
+ {
+ default: throw new AssertionError();
+ case Key: return containsAll((AbstractKeys<?, ?>) that);
+ case Range: return intersectsAll((AbstractRanges<?>) that);
+ }
+ }
+
+ /**
+ * @return true iff {@code that} is a subset of {@code this}
+ */
+ public boolean intersectsAll(AbstractRanges<?> that)
+ {
+ if (this.isEmpty()) return that.isEmpty();
+ if (that.isEmpty()) return true;
+ return Routables.rangeFoldl(that, this, (p, v, from, to) -> v + (to - from), 0, 0, 0) == that.size();
+ }
+
@Override
public int size()
{
@@ -100,7 +120,7 @@
}
@Override
- public final Routable.Domain kindOfContents()
+ public final Routable.Domain domain()
{
return Routable.Domain.Range;
}
@@ -117,42 +137,33 @@
return size() == 0;
}
- public final boolean intersects(Routables<?, ?> keysOrRanges)
- {
- switch (keysOrRanges.kindOfContents())
- {
- default: throw new AssertionError();
- case Key: return intersects((AbstractKeys<?, ?>) keysOrRanges);
- case Range: return intersects((AbstractRanges<?>) keysOrRanges);
- }
- }
-
+ @Override
public final boolean intersects(AbstractKeys<?, ?> keys)
{
return findNextIntersection(0, keys, 0) >= 0;
}
@Override
- public boolean intersects(AbstractRanges<?> that)
+ public final boolean intersects(AbstractRanges<?> that)
{
return SortedArrays.findNextIntersection(this.ranges, 0, that.ranges, 0, Range::compareIntersecting) >= 0;
}
- public boolean intersects(Range that)
+ public final boolean intersects(Range that)
{
- return SortedArrays.binarySearch(ranges, 0, ranges.length, that, Range::compareIntersecting, SortedArrays.Search.FAST) >= 0;
+ return indexOf(that) >= 0;
}
// returns ri in low 32 bits, ki in top, or -1 if no match found
@Override
- public long findNextIntersection(int ri, AbstractKeys<?, ?> keys, int ki)
+ public final long findNextIntersection(int ri, AbstractKeys<?, ?> keys, int ki)
{
return swapHighLow32b(SortedArrays.findNextIntersectionWithMultipleMatches(keys.keys, ki, ranges, ri));
}
// returns ki in bottom 32 bits, ri in top, or -1 if no match found
@Override
- public long findNextIntersection(int thisi, AbstractRanges<?> that, int thati)
+ public final long findNextIntersection(int thisi, AbstractRanges<?> that, int thati)
{
return SortedArrays.findNextIntersectionWithMultipleMatches(ranges, thisi, that.ranges, thati, Range::compareIntersecting, Range::compareIntersecting);
}
@@ -164,18 +175,24 @@
}
@Override
- public int findNext(int thisIndex, Range find, SortedArrays.Search search)
+ public final int findNext(int thisIndex, Range find, SortedArrays.Search search)
{
return SortedArrays.exponentialSearch(ranges, thisIndex, size(), find, Range::compareIntersecting, search);
}
+ @Override
+ public final int findNext(int thisIndex, RoutableKey find, SortedArrays.Search search)
+ {
+ return SortedArrays.exponentialSearch(ranges, thisIndex, size(), find, (k, r) -> -r.compareTo(k), search);
+ }
+
/**
* Returns the ranges that intersect with any of the members of the parameter.
* DOES NOT MODIFY THE RANGES.
*/
static <RS extends AbstractRanges<?>, P> RS intersect(RS input, Unseekables<?, ?> keysOrRanges, P param, BiFunction<P, Range[], RS> constructor)
{
- switch (keysOrRanges.kindOfContents())
+ switch (keysOrRanges.domain())
{
default: throw new AssertionError();
case Range:
@@ -198,7 +215,24 @@
RS construct(Ranges covering, P param, Range[] ranges);
}
- static <RS extends AbstractRanges<?>, P> RS slice(Ranges covering, AbstractRanges<?> input, P param, SliceConstructor<P, RS> constructor)
+ @Override
+ public final Ranges slice(Ranges ranges, Slice slice)
+ {
+ return slice(ranges, slice, this, null, (i1, i2, rs) -> new Ranges(rs));
+ }
+
+ static <RS extends AbstractRanges<?>, P> RS slice(Ranges covering, Slice slice, AbstractRanges<?> input, P param, SliceConstructor<P, RS> constructor)
+ {
+ switch (slice)
+ {
+ default: throw new AssertionError();
+ case Overlapping: return sliceOverlapping(covering, input, param, constructor);
+ case Minimal: return sliceMinimal(covering, input, param, constructor);
+ case Maximal: return sliceMaximal(covering, input, param, constructor);
+ }
+ }
+
+ static <RS extends AbstractRanges<?>, P> RS sliceOverlapping(Ranges covering, AbstractRanges<?> input, P param, SliceConstructor<P, RS> constructor)
{
ObjectBuffers<Range> cachedRanges = cachedRanges();
@@ -217,9 +251,102 @@
ri = (int) (lri >>> 32);
Range l = covering.ranges[li], r = input.ranges[ri];
- buffer[bufferCount++] = Range.slice(l, r);
- if (l.end().compareTo(r.end()) >= 0) ri++;
- else li++;
+ buffer[bufferCount++] = r;
+ if (l.end().compareTo(r.end()) <= 0) li++;
+ ri++;
+ }
+ Range[] result = cachedRanges.complete(buffer, bufferCount);
+ cachedRanges.discard(buffer, bufferCount);
+ return constructor.construct(covering, param, result);
+ }
+ catch (Throwable t)
+ {
+ cachedRanges.forceDiscard(buffer, bufferCount);
+ throw t;
+ }
+ }
+
+ static <RS extends AbstractRanges<?>, P> RS sliceMinimal(Ranges covering, AbstractRanges<?> input, P param, SliceConstructor<P, RS> constructor)
+ {
+ ObjectBuffers<Range> cachedRanges = cachedRanges();
+
+ Range[] buffer = cachedRanges.get(covering.ranges.length + input.ranges.length);
+ int bufferCount = 0;
+ try
+ {
+ int li = 0, ri = 0;
+ while (true)
+ {
+ long lri = covering.findNextIntersection(li, input, ri);
+ if (lri < 0)
+ break;
+
+ if (bufferCount == buffer.length)
+ buffer = cachedRanges.resize(buffer, bufferCount, bufferCount + 1 + (bufferCount/2));
+
+ li = (int) (lri);
+ ri = (int) (lri >>> 32);
+
+ Range l = covering.ranges[li], r = input.ranges[ri];
+ RoutingKey ls = l.start(), rs = r.start(), le = l.end(), re = r.end();
+ int cs = rs.compareTo(ls), ce = re.compareTo(le);
+ if (cs >= 0 && ce <= 0)
+ {
+ buffer[bufferCount++] = r;
+ ++ri;
+ }
+ else
+ {
+ buffer[bufferCount++] = r.newRange(cs >= 0 ? rs : ls, ce <= 0 ? re : le);
+ if (ce <= 0) ++ri;
+ }
+ if (ce >= 0) li++; // le <= re
+ }
+ Range[] result = cachedRanges.complete(buffer, bufferCount);
+ cachedRanges.discard(buffer, bufferCount);
+ return constructor.construct(covering, param, result);
+ }
+ catch (Throwable t)
+ {
+ cachedRanges.forceDiscard(buffer, bufferCount);
+ throw t;
+ }
+ }
+
+ static <RS extends AbstractRanges<?>, P> RS sliceMaximal(Ranges covering, AbstractRanges<?> input, P param, SliceConstructor<P, RS> constructor)
+ {
+ ObjectBuffers<Range> cachedRanges = cachedRanges();
+
+ Range[] buffer = cachedRanges.get(covering.ranges.length + input.ranges.length);
+ int bufferCount = 0;
+ try
+ {
+ int li = 0, ri = 0;
+ while (true)
+ {
+ long lri = covering.findNextIntersection(li, input, ri);
+ if (lri < 0)
+ break;
+
+ if (bufferCount == buffer.length)
+ buffer = cachedRanges.resize(buffer, bufferCount, bufferCount + 1 + (bufferCount/2));
+
+ li = (int) (lri);
+ ri = (int) (lri >>> 32);
+
+ Range l = covering.ranges[li], r = input.ranges[ri]; // l(eft), r(right)
+ RoutingKey ls = l.start(), rs = r.start(), le = l.end(), re = r.end(); // l(eft),r(ight) s(tart),e(nd)
+ int cs = rs.compareTo(ls), ce = re.compareTo(le); // c(ompare) s(tart),e(nd)
+ if (cs <= 0 && ce >= 0)
+ {
+ buffer[bufferCount++] = r;
+ }
+ else
+ {
+ buffer[bufferCount++] = r.newRange(cs <= 0 ? rs : ls, ce >= 0 ? re : le);
+ }
+ ++li;
+ ++ri;
}
Range[] result = cachedRanges.complete(buffer, bufferCount);
cachedRanges.discard(buffer, bufferCount);
@@ -370,7 +497,7 @@
if (a == min) ai++;
else bi++;
}
- result[resultCount++] = a.subRange(start, end);
+ result[resultCount++] = a.newRange(start, end);
}
}
@@ -460,7 +587,7 @@
static Range maybeUpdateEnd(Range range, RoutingKey withEnd)
{
- return withEnd == range.end() ? range : range.subRange(range.start(), withEnd);
+ return withEnd == range.end() ? range : range.newRange(range.start(), withEnd);
}
static <RS extends AbstractRanges<?>> RS of(Function<Range[], RS> constructor, Range... ranges)
@@ -484,7 +611,7 @@
return constructor.apply(Arrays.copyOf(ranges, count));
}
- Arrays.sort(ranges, 0, count, Comparator.comparing(Range::start));
+ Arrays.sort(ranges, 0, count, Range::compare);
Range prev = ranges[0];
int removed = 0;
for (int i = 1 ; i < count ; ++i)
@@ -492,20 +619,28 @@
Range next = ranges[i];
if (prev.end().compareTo(next.start()) > 0)
{
- prev = prev.subRange(prev.start(), next.start());
if (prev.end().compareTo(next.end()) >= 0)
{
- removed++;
+ ++removed;
+ continue;
}
- else if (removed > 0)
+
+ if (prev.start().equals(next.start()))
{
- ranges[i - removed] = prev = next.subRange(prev.end(), next.end());
+ ++removed;
+ ranges[i - removed] = prev = next;
+ continue;
}
+
+ prev = prev.newRange(prev.start(), next.start());
+ ranges[i - (1 + removed)] = prev;
+ prev = next;
+ continue;
}
- else if (removed > 0)
- {
- ranges[i - removed] = prev = next;
- }
+
+ if (removed > 0)
+ ranges[i - (1 + removed)] = prev;
+ prev = next;
}
count -= removed;
diff --git a/accord-core/src/main/java/accord/primitives/AbstractUnseekableKeys.java b/accord-core/src/main/java/accord/primitives/AbstractUnseekableKeys.java
index 6176193..c5078ef 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractUnseekableKeys.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractUnseekableKeys.java
@@ -4,6 +4,7 @@
import java.util.Arrays;
+// TODO: do we need this class?
public abstract class AbstractUnseekableKeys<KS extends Unseekables<RoutingKey, ?>> extends AbstractKeys<RoutingKey, KS> implements Iterable<RoutingKey>, Unseekables<RoutingKey, KS>
{
AbstractUnseekableKeys(RoutingKey[] keys)
diff --git a/accord-core/src/main/java/accord/primitives/Deps.java b/accord-core/src/main/java/accord/primitives/Deps.java
index 5b4a984..748fbb4 100644
--- a/accord-core/src/main/java/accord/primitives/Deps.java
+++ b/accord-core/src/main/java/accord/primitives/Deps.java
@@ -237,4 +237,10 @@
{
return this.keyDeps.equals(that.keyDeps) && this.rangeDeps.equals(that.rangeDeps);
}
+
+ @Override
+ public int hashCode()
+ {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/accord-core/src/main/java/accord/primitives/FullRangeRoute.java b/accord-core/src/main/java/accord/primitives/FullRangeRoute.java
index 848311d..7696025 100644
--- a/accord-core/src/main/java/accord/primitives/FullRangeRoute.java
+++ b/accord-core/src/main/java/accord/primitives/FullRangeRoute.java
@@ -1,7 +1,6 @@
package accord.primitives;
import accord.api.RoutingKey;
-import accord.utils.Invariants;
public class FullRangeRoute extends RangeRoute implements FullRoute<Range>
{
@@ -31,26 +30,6 @@
}
@Override
- public boolean intersects(AbstractRanges<?> ranges)
- {
- return true;
- }
-
- @Override
- public FullRangeRoute with(RoutingKey withKey)
- {
- Invariants.checkArgument(contains(withKey));
- // TODO (now): remove this in favour of parent implementation - ambiguous at present
- return this;
- }
-
- @Override
- public PartialRangeRoute slice(Ranges ranges)
- {
- return slice(ranges, this, homeKey, PartialRangeRoute::new);
- }
-
- @Override
public PartialRangeRoute sliceStrict(Ranges ranges)
{
return slice(ranges);
diff --git a/accord-core/src/main/java/accord/primitives/FullRoute.java b/accord-core/src/main/java/accord/primitives/FullRoute.java
index c4d0c4e..7da052a 100644
--- a/accord-core/src/main/java/accord/primitives/FullRoute.java
+++ b/accord-core/src/main/java/accord/primitives/FullRoute.java
@@ -3,4 +3,5 @@
public interface FullRoute<T extends Unseekable> extends Route<T>, Unseekables<T, Route<T>>
{
@Override default FullRoute<T> union(Route<T> route) { return this; }
+ @Override default Ranges sliceCovering(Ranges newRanges, Slice slice) { return newRanges; }
}
diff --git a/accord-core/src/main/java/accord/primitives/KeyDeps.java b/accord-core/src/main/java/accord/primitives/KeyDeps.java
index 1ac6b65..f9887b3 100644
--- a/accord-core/src/main/java/accord/primitives/KeyDeps.java
+++ b/accord-core/src/main/java/accord/primitives/KeyDeps.java
@@ -29,6 +29,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
+import static accord.primitives.Routables.Slice.Overlapping;
import static accord.utils.ArrayBuffers.*;
import static accord.utils.RelationMultiMap.*;
import static accord.utils.SortedArrays.Search.FAST;
diff --git a/accord-core/src/main/java/accord/primitives/KeyRoute.java b/accord-core/src/main/java/accord/primitives/KeyRoute.java
index 39f0653..7ea8cb1 100644
--- a/accord-core/src/main/java/accord/primitives/KeyRoute.java
+++ b/accord-core/src/main/java/accord/primitives/KeyRoute.java
@@ -7,6 +7,8 @@
import javax.annotation.Nonnull;
+import static accord.utils.ArrayBuffers.cachedRoutingKeys;
+
public abstract class KeyRoute extends AbstractUnseekableKeys<Route<RoutingKey>> implements Route<RoutingKey>
{
public final RoutingKey homeKey;
@@ -24,6 +26,13 @@
}
@Override
+ public Unseekables<RoutingKey, ?> with(Unseekables<RoutingKey, ?> with)
+ {
+ AbstractKeys<RoutingKey, ?> that = (AbstractKeys<RoutingKey, ?>) with;
+ return wrap(SortedArrays.linearUnion(keys, that.keys, cachedRoutingKeys()), that);
+ }
+
+ @Override
public RoutingKey homeKey()
{
return homeKey;
@@ -31,4 +40,17 @@
@Override
public abstract PartialKeyRoute slice(Ranges ranges);
+
+ @Override
+ public Unseekables<RoutingKey, ?> slice(Ranges ranges, Slice slice)
+ {
+ return slice(ranges);
+ }
+
+ private AbstractUnseekableKeys<?> wrap(RoutingKey[] wrap, AbstractKeys<RoutingKey, ?> that)
+ {
+ return wrap == keys ? this : wrap == that.keys && that instanceof AbstractUnseekableKeys<?>
+ ? (AbstractUnseekableKeys<?>) that
+ : new RoutingKeys(wrap);
+ }
}
diff --git a/accord-core/src/main/java/accord/primitives/Keys.java b/accord-core/src/main/java/accord/primitives/Keys.java
index a4f9cee..4af763c 100644
--- a/accord-core/src/main/java/accord/primitives/Keys.java
+++ b/accord-core/src/main/java/accord/primitives/Keys.java
@@ -26,6 +26,7 @@
import accord.utils.ArrayBuffers.ObjectBuffers;
import static accord.utils.ArrayBuffers.cachedKeys;
+import static accord.utils.SortedArrays.isSortedUnique;
// TODO (low priority, efficiency): this should probably be a BTree
public class Keys extends AbstractKeys<Key, Keys> implements Seekables<Key, Keys>
@@ -64,14 +65,13 @@
return Arrays.equals(keys, keys1.keys);
}
- @Override
- public Keys union(Keys that)
+ public Keys with(Keys that)
{
return wrap(SortedArrays.linearUnion(keys, that.keys, cachedKeys()), that);
}
@Override
- public Keys slice(Ranges ranges)
+ public Keys slice(Ranges ranges, Slice slice)
{
return wrap(slice(ranges, Key[]::new));
}
@@ -188,11 +188,8 @@
public static Keys ofSorted(Key ... keys)
{
- for (int i = 1 ; i < keys.length ; ++i)
- {
- if (keys[i - 1].compareTo(keys[i]) >= 0)
- throw new IllegalArgumentException(Arrays.toString(keys) + " is not sorted");
- }
+ if (!isSortedUnique(keys))
+ throw new IllegalArgumentException(Arrays.toString(keys) + " is not sorted");
return new Keys(keys);
}
diff --git a/accord-core/src/main/java/accord/primitives/PartialDeps.java b/accord-core/src/main/java/accord/primitives/PartialDeps.java
index 4a633ad..72a86b1 100644
--- a/accord-core/src/main/java/accord/primitives/PartialDeps.java
+++ b/accord-core/src/main/java/accord/primitives/PartialDeps.java
@@ -43,7 +43,7 @@
public PartialDeps with(PartialDeps that)
{
Invariants.checkArgument((this.rangeDeps == null) == (that.rangeDeps == null));
- return new PartialDeps(that.covering.union(this.covering),
+ return new PartialDeps(that.covering.with(this.covering),
this.keyDeps.with(that.keyDeps),
this.rangeDeps == null ? null : this.rangeDeps.with(that.rangeDeps));
}
@@ -55,16 +55,16 @@
return new Deps(keyDeps, rangeDeps);
}
- // PartialRoute<?>might cover a wider set of ranges, some of which may have no involved keys
- public PartialDeps reconstitutePartial(PartialRoute<?> route)
+ // covering might cover a wider set of ranges, some of which may have no involved keys
+ public PartialDeps reconstitutePartial(Ranges covering)
{
- if (!covers(route))
+ if (!covers(covering))
throw new IllegalArgumentException();
- if (covers(route.covering()))
+ if (covers(covering))
return this;
- return new PartialDeps(route.covering(), keyDeps, rangeDeps);
+ return new PartialDeps(covering, keyDeps, rangeDeps);
}
@Override
diff --git a/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java b/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java
index 212bffb..bd999b0 100644
--- a/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java
+++ b/accord-core/src/main/java/accord/primitives/PartialKeyRoute.java
@@ -26,12 +26,23 @@
this.covering = covering;
}
+ public static PartialKeyRoute empty(RoutingKey homeKey)
+ {
+ return new PartialKeyRoute(Ranges.EMPTY, homeKey, RoutingKeys.EMPTY.keys);
+ }
+
@Override
public PartialKeyRoute sliceStrict(Ranges newRanges)
{
if (!covering.containsAll(newRanges))
throw new IllegalArgumentException("Not covered");
+ return slice(newRanges);
+ }
+
+ @Override
+ public PartialKeyRoute slice(Ranges newRanges)
+ {
RoutingKey[] keys = slice(newRanges, RoutingKey[]::new);
return new PartialKeyRoute(newRanges, homeKey, keys);
}
@@ -64,14 +75,13 @@
return new RoutingKeys(toRoutingKeysArray(withKey));
}
- @Override
- public PartialKeyRoute slice(Ranges newRanges)
+ public PartialKeyRoute slice(Ranges newRanges, Slice slice)
{
if (newRanges.containsAll(covering))
return this;
- RoutingKey[] keys = slice(covering, RoutingKey[]::new);
- return new PartialKeyRoute(covering, homeKey, keys);
+ RoutingKey[] keys = slice(newRanges, RoutingKey[]::new);
+ return new PartialKeyRoute(newRanges, homeKey, keys);
}
@Override
@@ -89,7 +99,7 @@
PartialKeyRoute that = (PartialKeyRoute) with;
Invariants.checkState(homeKey.equals(that.homeKey));
RoutingKey[] keys = SortedArrays.linearUnion(this.keys, that.keys, RoutingKey[]::new);
- Ranges covering = this.covering.union(that.covering);
+ Ranges covering = this.covering.with(that.covering);
if (covering == this.covering && keys == this.keys)
return this;
if (covering == that.covering && keys == that.keys)
diff --git a/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java b/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
index 204d939..88db1eb 100644
--- a/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
+++ b/accord-core/src/main/java/accord/primitives/PartialRangeRoute.java
@@ -4,6 +4,7 @@
import accord.utils.Invariants;
import static accord.primitives.AbstractRanges.UnionMode.MERGE_OVERLAPPING;
+import static accord.primitives.Routables.Slice.Overlapping;
/**
* A slice of a Route that covers
@@ -26,6 +27,11 @@
this.covering = covering;
}
+ public static PartialRangeRoute empty(RoutingKey homeKey)
+ {
+ return new PartialRangeRoute(Ranges.EMPTY, homeKey, NO_RANGES);
+ }
+
@Override
public UnseekablesKind kind()
{
@@ -45,39 +51,18 @@
}
@Override
- public boolean intersects(AbstractRanges<?> ranges)
- {
- return ranges.intersects(covering);
- }
-
- @Override
- public PartialRangeRoute sliceStrict(Ranges newRange)
- {
- if (!covering.containsAll(newRange))
- throw new IllegalArgumentException("Not covered");
-
- return slice(newRange, this, homeKey, PartialRangeRoute::new);
- }
-
- @Override
public Unseekables<Range, ?> toMaximalUnseekables()
{
- throw new UnsupportedOperationException();
+ return with(homeKey);
}
@Override
- public PartialRangeRoute slice(Ranges newRanges)
+ public PartialRangeRoute sliceStrict(Ranges newRanges)
{
- if (newRanges.containsAll(covering))
- return this;
+ if (!covering.containsAll(newRanges))
+ throw new IllegalArgumentException("Not covered");
- return slice(newRanges, this, homeKey, PartialRangeRoute::new);
- }
-
- @Override
- public Unseekables<Range, ?> with(RoutingKey withKey)
- {
- throw new UnsupportedOperationException();
+ return slice(newRanges);
}
@Override
@@ -95,7 +80,7 @@
PartialRangeRoute that = (PartialRangeRoute) with;
Invariants.checkState(homeKey.equals(that.homeKey));
- Ranges covering = this.covering.union(that.covering);
+ Ranges covering = this.covering.with(that.covering);
if (covering == this.covering) return this;
else if (covering == that.covering) return that;
diff --git a/accord-core/src/main/java/accord/primitives/PartialRoute.java b/accord-core/src/main/java/accord/primitives/PartialRoute.java
index 50ebea5..2c1c915 100644
--- a/accord-core/src/main/java/accord/primitives/PartialRoute.java
+++ b/accord-core/src/main/java/accord/primitives/PartialRoute.java
@@ -1,5 +1,8 @@
package accord.primitives;
+import accord.api.RoutingKey;
+import accord.primitives.Routable.Domain;
+
public interface PartialRoute<T extends Unseekable> extends Route<T>
{
@Override
@@ -10,4 +13,15 @@
* Expected to be compatible PartialRoute type, i.e. both split from the same FullRoute
*/
PartialRoute<T> union(PartialRoute<T> route);
+
+ @Override
+ default Ranges sliceCovering(Ranges newRanges, Slice slice)
+ {
+ return covering().slice(newRanges, slice);
+ }
+
+ static PartialRoute<?> empty(Domain domain, RoutingKey homeKey)
+ {
+ return domain.isKey() ? PartialKeyRoute.empty(homeKey) : PartialRangeRoute.empty(homeKey);
+ }
}
diff --git a/accord-core/src/main/java/accord/primitives/PartialTxn.java b/accord-core/src/main/java/accord/primitives/PartialTxn.java
index 71e7554..94f4d56 100644
--- a/accord-core/src/main/java/accord/primitives/PartialTxn.java
+++ b/accord-core/src/main/java/accord/primitives/PartialTxn.java
@@ -5,6 +5,7 @@
import accord.api.Query;
import accord.api.Read;
import accord.api.Update;
+import accord.utils.Invariants;
public interface PartialTxn extends Txn
{
@@ -12,7 +13,7 @@
// TODO (low priority, efficiency): efficient merge when more than one input
PartialTxn with(PartialTxn add);
Txn reconstitute(FullRoute<?> route);
- PartialTxn reconstitutePartial(PartialRoute<?> route);
+ PartialTxn reconstitutePartial(Ranges covering);
default boolean covers(Unseekables<?, ?> unseekables)
{
@@ -56,8 +57,8 @@
if (!add.kind().equals(kind()))
throw new IllegalArgumentException();
- Ranges covering = this.covering.union(add.covering());
- Seekables<?, ?> keys = ((Seekables)this.keys()).union(add.keys());
+ Ranges covering = this.covering.with(add.covering());
+ Seekables<?, ?> keys = ((Seekables)this.keys()).with(add.keys());
Read read = this.read().merge(add.read());
Query query = this.query() == null ? add.query() : this.query();
Update update = this.update() == null ? null : this.update().merge(add.update());
@@ -89,15 +90,15 @@
}
@Override
- public PartialTxn reconstitutePartial(PartialRoute<?> route)
+ public PartialTxn reconstitutePartial(Ranges covering)
{
- if (!covers(route))
- throw new IllegalStateException("Incomplete PartialTxn: " + this + ", route: " + route);
+ if (!covers(covering))
+ throw new IllegalStateException("Incomplete PartialTxn: " + this + ", covering: " + covering);
- if (covering.containsAll(route.covering()))
+ if (this.covering.containsAll(covering))
return this;
- return new PartialTxn.InMemory(route.covering(), kind(), keys(), read(), query(), update());
+ return new PartialTxn.InMemory(covering, kind(), keys(), read(), query(), update());
}
}
diff --git a/accord-core/src/main/java/accord/primitives/Range.java b/accord-core/src/main/java/accord/primitives/Range.java
index 98fb673..db6c3aa 100644
--- a/accord-core/src/main/java/accord/primitives/Range.java
+++ b/accord-core/src/main/java/accord/primitives/Range.java
@@ -64,7 +64,7 @@
}
@Override
- public Range subRange(RoutingKey start, RoutingKey end)
+ public Range newRange(RoutingKey start, RoutingKey end)
{
return new EndInclusive(start, end);
}
@@ -106,7 +106,7 @@
}
@Override
- public Range subRange(RoutingKey start, RoutingKey end)
+ public Range newRange(RoutingKey start, RoutingKey end)
{
return new StartInclusive(start, end);
}
@@ -135,7 +135,7 @@
}
@Override
- public Range subRange(RoutingKey start, RoutingKey end)
+ public Range newRange(RoutingKey start, RoutingKey end)
{
throw new UnsupportedOperationException("subRange");
}
@@ -197,7 +197,7 @@
public abstract boolean startInclusive();
public abstract boolean endInclusive();
- public abstract Range subRange(RoutingKey start, RoutingKey end);
+ public abstract Range newRange(RoutingKey start, RoutingKey end);
@Override
public Key asKey()
@@ -270,6 +270,14 @@
return that.start.compareTo(this.start) >= 0 && that.end.compareTo(this.end) <= 0;
}
+ public Range slice(Range truncateTo)
+ {
+ int cs = start.compareTo(truncateTo.start);
+ int ce = end.compareTo(truncateTo.end);
+ if (cs >= 0 && ce <= 0) return this;
+ return newRange(cs >= 0 ? start : truncateTo.start, ce <= 0 ? end : truncateTo.end);
+ }
+
public boolean intersects(AbstractKeys<?, ?> keys)
{
return SortedArrays.binarySearch(keys.keys, 0, keys.size(), this, Range::compareTo, FAST) >= 0;
@@ -286,7 +294,7 @@
RoutingKey start = this.start.compareTo(that.start) > 0 ? this.start : that.start;
RoutingKey end = this.end.compareTo(that.end) < 0 ? this.end : that.end;
- return subRange(start, end);
+ return newRange(start, end);
}
/**
@@ -312,9 +320,33 @@
}
@Override
- public RoutingKey someIntersectingRoutingKey()
+ public RoutingKey someIntersectingRoutingKey(Ranges ranges)
{
- return startInclusive() ? start.toUnseekable() : end.toUnseekable();
+ if (ranges == null)
+ return startInclusive() ? start.toUnseekable() : end.toUnseekable();
+
+ int i = ranges.indexOf(this);
+ Range that = ranges.get(i);
+ if (this.start().compareTo(that.start()) <= 0)
+ {
+ if (startInclusive())
+ return that.start();
+
+ if (this.end().compareTo(that.end()) <= 0)
+ return this.end();
+
+ return that.end();
+ }
+ else
+ {
+ if (startInclusive())
+ return this.start();
+
+ if (that.end().compareTo(this.end()) <= 0)
+ return that.end();
+
+ return this.end();
+ }
}
public static Range slice(Range bound, Range toSlice)
@@ -323,7 +355,7 @@
if (bound.contains(toSlice))
return toSlice;
- return toSlice.subRange(
+ return toSlice.newRange(
toSlice.start().compareTo(bound.start()) >= 0 ? toSlice.start() : bound.start(),
toSlice.end().compareTo(bound.end()) <= 0 ? toSlice.end() : bound.end()
);
diff --git a/accord-core/src/main/java/accord/primitives/RangeRoute.java b/accord-core/src/main/java/accord/primitives/RangeRoute.java
index 8e754fd..ff30854 100644
--- a/accord-core/src/main/java/accord/primitives/RangeRoute.java
+++ b/accord-core/src/main/java/accord/primitives/RangeRoute.java
@@ -5,6 +5,9 @@
import javax.annotation.Nonnull;
+import static accord.primitives.AbstractRanges.UnionMode.MERGE_OVERLAPPING;
+import static accord.primitives.Routables.Slice.Overlapping;
+
public abstract class RangeRoute extends AbstractRanges<Route<Range>> implements Route<Range>
{
public final RoutingKey homeKey;
@@ -16,9 +19,27 @@
}
@Override
+ public Unseekables<Range, ?> with(Unseekables<Range, ?> with)
+ {
+ if (isEmpty())
+ return with;
+
+ return union(MERGE_OVERLAPPING, this, (AbstractRanges<?>) with, null, null,
+ (left, right, rs) -> Ranges.ofSortedAndDeoverlapped(rs));
+ }
+
+ public Unseekables<Range, ?> with(RoutingKey withKey)
+ {
+ if (contains(withKey))
+ return this;
+
+ return with(Ranges.of(withKey.asRange()));
+ }
+
+ @Override
public PartialRangeRoute slice(Ranges ranges)
{
- return slice(ranges, this, homeKey, PartialRangeRoute::new);
+ return slice(ranges, Overlapping, this, homeKey, PartialRangeRoute::new);
}
@Override
diff --git a/accord-core/src/main/java/accord/primitives/Ranges.java b/accord-core/src/main/java/accord/primitives/Ranges.java
index 2b8a2bc..f3332fe 100644
--- a/accord-core/src/main/java/accord/primitives/Ranges.java
+++ b/accord-core/src/main/java/accord/primitives/Ranges.java
@@ -25,6 +25,7 @@
import java.util.stream.Stream;
import static accord.primitives.AbstractRanges.UnionMode.MERGE_OVERLAPPING;
+import static accord.primitives.Routables.Slice.Overlapping;
import static accord.utils.Utils.toArray;
public class Ranges extends AbstractRanges<Ranges> implements Iterable<Range>, Seekables<Range, Ranges>, Unseekables<Range, Ranges>
@@ -80,11 +81,21 @@
@Override
public Ranges slice(Ranges ranges)
{
- return slice(ranges, this, null, (i1, i2, rs) -> new Ranges(rs));
+ return slice(ranges, Overlapping);
}
@Override
- public Ranges union(Ranges that)
+ public Ranges with(Unseekables<Range, ?> that)
+ {
+ return with((AbstractRanges<?>) that);
+ }
+
+ public Ranges with(Ranges that)
+ {
+ return union(MERGE_OVERLAPPING, that);
+ }
+
+ public Ranges with(AbstractRanges<?> that)
{
return union(MERGE_OVERLAPPING, that);
}
@@ -92,7 +103,10 @@
@Override
public Unseekables<Range, ?> with(RoutingKey withKey)
{
- throw new UnsupportedOperationException();
+ if (contains(withKey))
+ return this;
+
+ return with(Ranges.of(withKey.asRange()));
}
@Override
@@ -108,8 +122,10 @@
}
@Override
- public FullRoute<Range> toRoute(RoutingKey homeKey)
+ public FullRangeRoute toRoute(RoutingKey homeKey)
{
+ if (!contains(homeKey))
+ return with(Ranges.of(homeKey.asRange())).toRoute(homeKey);
return new FullRangeRoute(homeKey, ranges);
}
@@ -122,6 +138,14 @@
});
}
+ public Ranges union(UnionMode mode, AbstractRanges<?> that)
+ {
+ return union(mode, this, that, this, that, (left, right, ranges) -> {
+ if (ranges == left.ranges) return left;
+ return new Ranges(ranges);
+ });
+ }
+
public Ranges mergeTouching()
{
return mergeTouching(this, Ranges::new);
@@ -157,7 +181,7 @@
int ecmp = thisRange.end().compareTo(thatRange.end());
if (scmp < 0)
- result.add(thisRange.subRange(thisRange.start(), thatRange.start()));
+ result.add(thisRange.newRange(thisRange.start(), thatRange.start()));
if (ecmp <= 0)
{
@@ -166,7 +190,7 @@
}
else
{
- thisRange = thisRange.subRange(thatRange.end(), thisRange.end());
+ thisRange = thisRange.newRange(thatRange.end(), thisRange.end());
thatIdx++;
}
}
diff --git a/accord-core/src/main/java/accord/primitives/Routable.java b/accord-core/src/main/java/accord/primitives/Routable.java
index a94132b..7b77bf8 100644
--- a/accord-core/src/main/java/accord/primitives/Routable.java
+++ b/accord-core/src/main/java/accord/primitives/Routable.java
@@ -2,6 +2,8 @@
import accord.api.RoutingKey;
+import javax.annotation.Nullable;
+
/**
* Something that can be found in the cluster, and MAYBE found on disk (if Seekable)
*/
@@ -30,5 +32,9 @@
Domain domain();
Unseekable toUnseekable();
- RoutingKey someIntersectingRoutingKey();
+
+ /**
+ * Deterministically select a key that intersects this Routable and the provided Ranges
+ */
+ RoutingKey someIntersectingRoutingKey(@Nullable Ranges ranges);
}
diff --git a/accord-core/src/main/java/accord/primitives/RoutableKey.java b/accord-core/src/main/java/accord/primitives/RoutableKey.java
index c48124d..e7d78a1 100644
--- a/accord-core/src/main/java/accord/primitives/RoutableKey.java
+++ b/accord-core/src/main/java/accord/primitives/RoutableKey.java
@@ -1,8 +1,10 @@
package accord.primitives;
import accord.api.RoutingKey;
+import accord.utils.Invariants;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
public interface RoutableKey extends Routable, Comparable<RoutableKey>
{
@@ -36,7 +38,7 @@
public RoutingKey toUnseekable() { throw new UnsupportedOperationException(); }
@Override
- public RoutingKey someIntersectingRoutingKey() { throw new UnsupportedOperationException(); }
+ public RoutingKey someIntersectingRoutingKey(@Nullable Ranges ranges) { throw new UnsupportedOperationException(); }
}
/**
@@ -53,5 +55,9 @@
@Override
RoutingKey toUnseekable();
- @Override default RoutingKey someIntersectingRoutingKey() { return toUnseekable(); }
+ @Override default RoutingKey someIntersectingRoutingKey(@Nullable Ranges ranges)
+ {
+ Invariants.paranoid(ranges == null || ranges.contains(this));
+ return toUnseekable();
+ }
}
diff --git a/accord-core/src/main/java/accord/primitives/Routables.java b/accord-core/src/main/java/accord/primitives/Routables.java
index 88eb4f5..27645f9 100644
--- a/accord-core/src/main/java/accord/primitives/Routables.java
+++ b/accord-core/src/main/java/accord/primitives/Routables.java
@@ -1,12 +1,17 @@
package accord.primitives;
import accord.api.RoutingKey;
+import accord.primitives.Routable.Domain;
import accord.utils.IndexedFold;
import accord.utils.IndexedFoldToLong;
import accord.utils.IndexedRangeFoldToLong;
import accord.utils.SortedArrays;
import net.nicoulaj.compilecommand.annotations.Inline;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static accord.primitives.Routables.Slice.Overlapping;
import static accord.utils.SortedArrays.Search.FLOOR;
/**
@@ -14,17 +19,37 @@
*/
public interface Routables<K extends Routable, U extends Routables<K, ?>> extends Iterable<K>
{
+ enum Slice
+ {
+ /** (Default) Overlapping ranges are returned unmodified */
+ Overlapping,
+ /** Overlapping ranges are split/shrunk to the intersection of the overlaps */
+ Minimal,
+ /** Overlapping ranges are extended to the union of the overlaps */
+ Maximal
+ }
+
int indexOf(K item);
K get(int i);
int size();
boolean isEmpty();
boolean intersects(AbstractRanges<?> ranges);
+ boolean intersects(AbstractKeys<?, ?> keys);
+ default boolean intersects(Routables<?, ?> routables)
+ {
+ switch (routables.domain())
+ {
+ default: throw new AssertionError();
+ case Key: return intersects((AbstractKeys<?, ?>) routables);
+ case Range: return intersects((AbstractRanges<?>) routables);
+ }
+ }
boolean contains(RoutableKey key);
boolean containsAll(Routables<?, ?> keysOrRanges);
U slice(Ranges ranges);
- Routables<K, U> union(U with);
+ Routables<K, ?> slice(Ranges ranges, Slice slice);
/**
* Search forwards from {code thisIndex} and {@code withIndex} to find the first entries in each collection
@@ -55,34 +80,81 @@
/**
* Perform {@link SortedArrays#exponentialSearch} from {@code thisIndex} looking for {@code find} with behaviour of {@code search}
*/
- int findNext(int thisIndex, K find, SortedArrays.Search search);
+ int findNext(int thisIndex, RoutableKey find, SortedArrays.Search search);
- Routable.Domain kindOfContents();
+ Domain domain();
+ /**
+ * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+ * Terminate once we hit {@code terminalValue}.
+ */
@Inline
static <Input extends Routable, T> T foldl(Routables<Input, ?> inputs, AbstractRanges<?> matching, IndexedFold<? super Input, T> fold, T initialValue)
{
return Helper.foldl(Routables::findNextIntersection, Helper::findLimit, inputs, matching, fold, initialValue);
}
+ /**
+ * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+ * If the inputs are ranges, narrow them to the parts the intersect with {@code matching}, so that we never visit
+ * any portion of a {@code Range} that is not in {@code matching} (See {@link Slice#Minimal}).
+ * Terminate once we hit {@code terminalValue}.
+ */
+ @Inline
+ static <T> T foldlMinimal(Seekables<?, ?> inputs, AbstractRanges<?> matching, IndexedFold<? super Seekable, T> fold, T initialValue)
+ {
+ return Helper.foldlMinimal(inputs, matching, fold, initialValue);
+ }
+
+ /**
+ * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+ * Terminate once we hit {@code terminalValue}.
+ */
@Inline
static <Input extends RoutableKey, T> T foldl(AbstractKeys<Input, ?> inputs, AbstractRanges<?> matching, IndexedFold<? super Input, T> fold, T initialValue)
{
return Helper.foldl(AbstractKeys::findNextIntersection, Helper::findLimit, inputs, matching, fold, initialValue);
}
+ /**
+ * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+ * Terminate once we hit {@code terminalValue}.
+ */
+ @Inline
+ static <T> T foldl(AbstractRanges<?> inputs, Routables<?, ?> matching, IndexedFold<? super Range, T> fold, T initialValue)
+ {
+ switch (matching.domain())
+ {
+ default: throw new AssertionError();
+ case Key: return Helper.foldl(AbstractRanges::findNextIntersection, Helper::findLimit, inputs, (AbstractKeys<?,?>)matching, fold, initialValue);
+ case Range: return Helper.foldl(AbstractRanges::findNextIntersection, Helper::findLimit, inputs, (AbstractRanges<?>)matching, fold, initialValue);
+ }
+ }
+
+ /**
+ * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+ * Terminate once we hit {@code terminalValue}.
+ */
@Inline
static <Input extends RoutableKey> long foldl(AbstractKeys<Input, ?> inputs, AbstractRanges<?> matching, IndexedFoldToLong<? super Input> fold, long param, long initialValue, long terminalValue)
{
return Helper.foldl(AbstractKeys::findNextIntersection, Helper::findLimit, inputs, matching, fold, param, initialValue, terminalValue);
}
+ /**
+ * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+ * Terminate once we hit {@code terminalValue}.
+ */
@Inline
static <Input extends Routable> long foldl(Routables<Input, ?> inputs, AbstractRanges<?> matching, IndexedFoldToLong<? super Input> fold, long param, long initialValue, long terminalValue)
{
return Helper.foldl(Routables::findNextIntersection, Helper::findLimit, inputs, matching, fold, param, initialValue, terminalValue);
}
+ /**
+ * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+ * Terminate once we hit {@code terminalValue}.
+ */
@Inline
static <Input extends Routable> long foldl(Routables<Input, ?> inputs, AbstractKeys<?, ?> matching, IndexedFoldToLong<? super Input> fold, long param, long initialValue, long terminalValue)
{
@@ -90,6 +162,10 @@
inputs, matching, fold, param, initialValue, terminalValue);
}
+ /**
+ * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+ * Terminate once we hit {@code terminalValue}.
+ */
@Inline
static <Input extends RoutingKey, Matching extends Routable> long foldl(AbstractKeys<Input, ?> inputs, Routables<Matching, ?> matching, IndexedFoldToLong<? super Input> fold, long param, long initialValue, long terminalValue)
{
@@ -97,6 +173,10 @@
inputs, matching, fold, param, initialValue, terminalValue);
}
+ /**
+ * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order, passing the contiguous ranges that intersect to the IndexedRangeFold function.
+ * Terminate once we hit {@code terminalValue}.
+ */
@Inline
static <Input extends Routable> long rangeFoldl(Routables<Input, ?> inputs, AbstractRanges<?> matching, IndexedRangeFoldToLong fold, long param, long initialValue, long terminalValue)
{
@@ -104,6 +184,10 @@
inputs, matching, fold, param, initialValue, terminalValue);
}
+ /**
+ * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order, passing the contiguous ranges that intersect to the IndexedRangeFold function.
+ * Terminate once we hit {@code terminalValue}.
+ */
@Inline
static <Input extends Routable> long rangeFoldl(Routables<Input, ?> inputs, AbstractKeys<?, ?> matching, IndexedRangeFoldToLong fold, long param, long initialValue, long terminalValue)
{
@@ -111,6 +195,10 @@
inputs, matching, fold, param, initialValue, terminalValue);
}
+ /**
+ * Fold-left over the {@code inputs} that <b>do not</b> intersect with {@code matching} in ascending order.
+ * Terminate once we hit {@code terminalValue}.
+ */
@Inline
static <Input extends Routable> long foldlMissing(Routables<Input, ?> inputs, Routables<Input, ?> notMatching, IndexedFoldToLong<? super Input> fold, long param, long initialValue, long terminalValue)
{
@@ -131,6 +219,31 @@
}
@Inline
+ static <T> T foldlMinimal(Seekables<?, ?> is, AbstractRanges<?> ms, IndexedFold<? super Seekable, T> fold, T initialValue)
+ {
+ int i = 0, m = 0;
+ while (true)
+ {
+ long im = is.findNextIntersection(i, ms, m);
+ if (im < 0)
+ break;
+
+ i = (int)(im);
+ m = (int)(im >>> 32);
+
+ Range mv = ms.get(m);
+ int nexti = Helper.findLimit(is, i, ms, m);
+ while (i < nexti)
+ {
+ initialValue = fold.apply(is.get(i).slice(mv), initialValue, i);
+ ++i;
+ }
+ }
+
+ return initialValue;
+ }
+
+ @Inline
static <Input extends Routable, Inputs extends Routables<Input, ?>, Matches extends Routables<?, ?>, T>
T foldl(SetIntersections<Inputs, Matches> setIntersections, ValueIntersections<Inputs, Matches> valueIntersections,
Inputs is, Matches ms, IndexedFold<? super Input, T> fold, T initialValue)
@@ -245,5 +358,15 @@
else nextl++;
return nextl;
}
+
+ static int findLimit(AbstractRanges<?> ls, int li, AbstractKeys<?, ?> rs, int ri)
+ {
+ RoutableKey r = rs.get(ri);
+
+ int nextl = ls.findNext(li + 1, r, FLOOR);
+ if (nextl < 0) nextl = -1 - nextl;
+ else nextl++;
+ return nextl;
+ }
}
}
diff --git a/accord-core/src/main/java/accord/primitives/Route.java b/accord-core/src/main/java/accord/primitives/Route.java
index 01b5ebd..b144da9 100644
--- a/accord-core/src/main/java/accord/primitives/Route.java
+++ b/accord-core/src/main/java/accord/primitives/Route.java
@@ -9,14 +9,22 @@
RoutingKey homeKey();
default boolean isRoute() { return true; }
+
boolean covers(Ranges ranges);
- @Override
- boolean intersects(AbstractRanges<?> ranges);
- @Override
+
+ /**
+ * Return an object containing any {@code K} present in either of the original collections,
+ * and covering the union of the ranges.
+ *
+ * Differs from {@link Unseekables#with} in that the parameter must be a {@link Route}
+ * and the result will be a {@link Route}.
+ */
Route<K> union(Route<K> route);
+
@Override
PartialRoute<K> slice(Ranges ranges);
PartialRoute<K> sliceStrict(Ranges ranges);
+ Ranges sliceCovering(Ranges ranges, Slice slice);
/**
* @return a PureRoutables that includes every shard we know of, not just those we contact
@@ -36,7 +44,7 @@
if (unseekables == null)
return null;
- switch (unseekables.kindOfContents())
+ switch (unseekables.domain())
{
default: throw new AssertionError();
case Key: return (FullKeyRoute) unseekables;
@@ -49,7 +57,7 @@
if (unseekables == null)
return null;
- switch (unseekables.kindOfContents())
+ switch (unseekables.domain())
{
default: throw new AssertionError();
case Key: return (KeyRoute) unseekables;
@@ -86,7 +94,7 @@
if (unseekables == null)
return null;
- switch (unseekables.kindOfContents())
+ switch (unseekables.domain())
{
default: throw new AssertionError();
case Key: return (PartialKeyRoute) unseekables;
diff --git a/accord-core/src/main/java/accord/primitives/RoutingKeys.java b/accord-core/src/main/java/accord/primitives/RoutingKeys.java
index 0e29954..e611811 100644
--- a/accord-core/src/main/java/accord/primitives/RoutingKeys.java
+++ b/accord-core/src/main/java/accord/primitives/RoutingKeys.java
@@ -28,16 +28,17 @@
}
@Override
- public RoutingKeys union(AbstractUnseekableKeys<?> that)
+ public Unseekables<RoutingKey, ?> with(Unseekables<RoutingKey, ?> with)
{
+ AbstractKeys<RoutingKey, ?> that = (AbstractKeys<RoutingKey, ?>) with;
return wrap(SortedArrays.linearUnion(keys, that.keys, cachedRoutingKeys()), that);
}
- @Override
public RoutingKeys with(RoutingKey with)
{
if (contains(with))
return this;
+
return wrap(toRoutingKeysArray(with));
}
@@ -53,6 +54,11 @@
return wrap(slice(ranges, RoutingKey[]::new));
}
+ public RoutingKeys slice(Ranges ranges, Slice slice)
+ {
+ return slice(ranges);
+ }
+
private RoutingKeys wrap(RoutingKey[] wrap, AbstractKeys<RoutingKey, ?> that)
{
return wrap == keys ? this : wrap == that.keys && that instanceof RoutingKeys ? (RoutingKeys)that : new RoutingKeys(wrap);
@@ -62,5 +68,4 @@
{
return wrap == keys ? this : new RoutingKeys(wrap);
}
-
}
diff --git a/accord-core/src/main/java/accord/primitives/Seekable.java b/accord-core/src/main/java/accord/primitives/Seekable.java
index f702353..3e55f41 100644
--- a/accord-core/src/main/java/accord/primitives/Seekable.java
+++ b/accord-core/src/main/java/accord/primitives/Seekable.java
@@ -9,4 +9,5 @@
{
Key asKey();
Range asRange();
+ Seekable slice(Range range);
}
diff --git a/accord-core/src/main/java/accord/primitives/Seekables.java b/accord-core/src/main/java/accord/primitives/Seekables.java
index 0bed766..cea483e 100644
--- a/accord-core/src/main/java/accord/primitives/Seekables.java
+++ b/accord-core/src/main/java/accord/primitives/Seekables.java
@@ -2,15 +2,20 @@
import accord.api.RoutingKey;
+import static accord.primitives.Routables.Slice.Overlapping;
+
/**
* Either a Route or a collection of Routable
*/
public interface Seekables<K extends Seekable, U extends Seekables<K, ?>> extends Routables<K, U>
{
@Override
- U slice(Ranges ranges);
+ default U slice(Ranges ranges) { return slice(ranges, Overlapping); }
+
@Override
- Seekables<K, U> union(U with);
+ U slice(Ranges ranges, Slice slice);
+ Seekables<K, U> with(U with);
+
Unseekables<?, ?> toUnseekables();
FullRoute<?> toRoute(RoutingKey homeKey);
diff --git a/accord-core/src/main/java/accord/primitives/Txn.java b/accord-core/src/main/java/accord/primitives/Txn.java
index 2feead3..4272b06 100644
--- a/accord-core/src/main/java/accord/primitives/Txn.java
+++ b/accord-core/src/main/java/accord/primitives/Txn.java
@@ -32,6 +32,8 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import static accord.primitives.Routables.Slice.Overlapping;
+
public interface Txn
{
enum Kind
@@ -74,7 +76,7 @@
this.update = null;
}
- public InMemory(@Nonnull Keys keys, @Nonnull Read read, @Nonnull Query query, @Nullable Update update)
+ public InMemory(@Nonnull Seekables<?, ?> keys, @Nonnull Read read, @Nonnull Query query, @Nullable Update update)
{
this.kind = Kind.Write;
this.keys = keys;
@@ -187,7 +189,7 @@
default Future<Data> read(SafeCommandStore safeStore, Command command)
{
Ranges ranges = safeStore.ranges().at(command.executeAt().epoch());
- List<Future<Data>> futures = read().keys().foldl(ranges, (key, accumulate, index) -> {
+ List<Future<Data>> futures = Routables.foldlMinimal(keys(), ranges, (key, accumulate, index) -> {
Future<Data> result = read().read(key, kind(), safeStore, command.executeAt(), safeStore.dataStore());
accumulate.add(result);
return accumulate;
diff --git a/accord-core/src/main/java/accord/primitives/Unseekables.java b/accord-core/src/main/java/accord/primitives/Unseekables.java
index 8cdb2ee..60799e8 100644
--- a/accord-core/src/main/java/accord/primitives/Unseekables.java
+++ b/accord-core/src/main/java/accord/primitives/Unseekables.java
@@ -3,7 +3,7 @@
import accord.api.RoutingKey;
/**
- * Either a Route or a simple collection of Unseekable
+ * Either a Route or a simple collection of keys or ranges
*/
public interface Unseekables<K extends Unseekable, U extends Unseekables<K, ?>> extends Iterable<K>, Routables<K, U>
{
@@ -24,11 +24,22 @@
@Override
U slice(Ranges ranges);
- @Override
- Unseekables<K, U> union(U with);
+ Unseekables<K, ?> slice(Ranges ranges, Slice slice);
+
+ /**
+ * Return an object containing any {@code K} present in either of the original collections.
+ *
+ * Differs from {@link Route#union} in that the parameter does not need to be a {@link Route}
+ * and the result may not be a {@link Route}, as the new object would not know the range
+ * covered by the additional keys or ranges.
+ */
+ Unseekables<K, ?> with(Unseekables<K, ?> with);
Unseekables<K, ?> with(RoutingKey withKey);
UnseekablesKind kind();
+ /**
+ * If both left and right are a Route, invoke {@link Route#union} on them. Otherwise invoke {@link #with}.
+ */
static <K extends Unseekable> Unseekables<K, ?> merge(Unseekables<K, ?> left, Unseekables<K, ?> right)
{
if (left == null) return right;
@@ -47,7 +58,7 @@
return right;
// non-route types can always accept route types as input, so just call its union method on the other
- return leftKind.isRoute() ? ((Unseekables)right).union(left) : ((Unseekables)left).union(right);
+ return left.with(right);
}
if (leftKind.isFullRoute())
@@ -55,7 +66,10 @@
if (rightKind.isFullRoute())
return right;
+
+ return ((Route)left).union((Route)right);
}
- return ((Unseekables)left).union(right);
+
+ return left.with(right);
}
}
diff --git a/accord-core/src/main/java/accord/primitives/Writes.java b/accord-core/src/main/java/accord/primitives/Writes.java
index c3f7635..af27cc5 100644
--- a/accord-core/src/main/java/accord/primitives/Writes.java
+++ b/accord-core/src/main/java/accord/primitives/Writes.java
@@ -32,10 +32,10 @@
{
public static final Future<Void> SUCCESS = ImmediateFuture.success(null);
public final Timestamp executeAt;
- public final Keys keys;
+ public final Seekables<?, ?> keys;
public final Write write;
- public Writes(Timestamp executeAt, Keys keys, Write write)
+ public Writes(Timestamp executeAt, Seekables<?, ?> keys, Write write)
{
this.executeAt = executeAt;
this.keys = keys;
@@ -71,7 +71,7 @@
if (ranges == null)
return SUCCESS;
- List<Future<Void>> futures = keys.foldl(ranges, (key, accumulate, index) -> {
+ List<Future<Void>> futures = Routables.foldl(keys, ranges, (key, accumulate, index) -> {
accumulate.add(write.apply(key, safeStore, executeAt, safeStore.dataStore()));
return accumulate;
}, new ArrayList<>());
diff --git a/accord-core/src/main/java/accord/topology/Topologies.java b/accord-core/src/main/java/accord/topology/Topologies.java
index 13a00ad..a8b8a98 100644
--- a/accord-core/src/main/java/accord/topology/Topologies.java
+++ b/accord-core/src/main/java/accord/topology/Topologies.java
@@ -333,7 +333,7 @@
{
Ranges ranges = Ranges.EMPTY;
for (int i = 0, mi = size() ; i < mi ; i++)
- ranges = ranges.union(get(i).rangesForNode(node));
+ ranges = ranges.with(get(i).rangesForNode(node));
return ranges;
}
diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java
index c055a82..5a385cf 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -268,7 +268,11 @@
bi = (int)(abi >>> 32);
if (predicate.test(param, bi))
+ {
+ if (count == newSubset.length)
+ newSubset = cachedInts.resize(newSubset, count, count * 2);
newSubset[count++] = bi;
+ }
ailim = as.findNext(ai + 1, bs.get(bi), FLOOR);
if (ailim < 0) ailim = -1 - ailim;
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java
index 6665ef5..236f17e 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -413,7 +413,7 @@
if (end < start) throw new IllegalArgumentException();
Ranges ranges = localRangesForEpoch(start);
for (long i = start + 1; i <= end ; ++i)
- ranges = ranges.union(localRangesForEpoch(i));
+ ranges = ranges.with(localRangesForEpoch(i));
return ranges;
}
diff --git a/accord-core/src/main/java/accord/utils/IndexedBiFunction.java b/accord-core/src/main/java/accord/utils/IndexedBiFunction.java
index 03b96ce..7b01458 100644
--- a/accord-core/src/main/java/accord/utils/IndexedBiFunction.java
+++ b/accord-core/src/main/java/accord/utils/IndexedBiFunction.java
@@ -20,5 +20,8 @@
public interface IndexedBiFunction<P1, P2, O>
{
+ /**
+ * Apply some function to two object parameters and an associated index (usually within a collection).
+ */
O apply(P1 p1, P2 p2, int index);
}
diff --git a/accord-core/src/main/java/accord/utils/IndexedConsumer.java b/accord-core/src/main/java/accord/utils/IndexedConsumer.java
index cfa2c1b..b5f8c9b 100644
--- a/accord-core/src/main/java/accord/utils/IndexedConsumer.java
+++ b/accord-core/src/main/java/accord/utils/IndexedConsumer.java
@@ -20,5 +20,8 @@
public interface IndexedConsumer<P1>
{
+ /**
+ * Apply some consumer to an object parameter and an associated index (usually within a collection).
+ */
void accept(P1 p1, int index);
}
diff --git a/accord-core/src/main/java/accord/utils/IndexedFold.java b/accord-core/src/main/java/accord/utils/IndexedFold.java
index 079c16c..d981ffb 100644
--- a/accord-core/src/main/java/accord/utils/IndexedFold.java
+++ b/accord-core/src/main/java/accord/utils/IndexedFold.java
@@ -20,4 +20,11 @@
public interface IndexedFold<P1, Accumulate> extends IndexedBiFunction<P1, Accumulate, Accumulate>
{
+ /**
+ * Apply some merge function accepting a constant object parameter p1, and the prior output of this function
+ * or the initial value, to some element of a collection, with the index of the element provided.
+ *
+ * This function is used for efficiently folding over some subset of a collection.
+ */
+ Accumulate apply(P1 p1, Accumulate p2, int index);
}
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java b/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java
index 0cd5a82..038fc8a 100644
--- a/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java
+++ b/accord-core/src/main/java/accord/utils/IndexedFoldIntersectToLong.java
@@ -20,5 +20,12 @@
public interface IndexedFoldIntersectToLong<P1>
{
+ /**
+ * Apply some long->long merge function accepting a constant object parameter p1, a long parameter p2, and the prior
+ * output of this function or the initial value, to some element occurring in two collections, with each collection's
+ * index for the element provided as the final parameters.
+ *
+ * This function is used for folding over the common elements of two sorted collections.
+ */
long apply(P1 p1, long p2, long accumulate, int leftIndex, int rightIndex);
}
diff --git a/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java b/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java
index c36df26..72d345a 100644
--- a/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java
+++ b/accord-core/src/main/java/accord/utils/IndexedFoldToLong.java
@@ -20,5 +20,12 @@
public interface IndexedFoldToLong<P1>
{
+ /**
+ * Apply some long->long merge function accepting a constant object parameter p1, a constant long parameter p2,
+ * and the prior output of this function or the initial value, to some element of a collection,
+ * with the index of the element provided.
+ *
+ * This function is used for efficiently folding over some subset of a collection.
+ */
long apply(P1 p1, long p2, long accumulate, int index);
}
diff --git a/accord-core/src/main/java/accord/utils/IndexedFunction.java b/accord-core/src/main/java/accord/utils/IndexedFunction.java
index df7020e..3d0e359 100644
--- a/accord-core/src/main/java/accord/utils/IndexedFunction.java
+++ b/accord-core/src/main/java/accord/utils/IndexedFunction.java
@@ -20,5 +20,8 @@
public interface IndexedFunction<P1, O>
{
+ /**
+ * Apply some function to an object parameter and an associated index (usually within a collection).
+ */
O apply(P1 p1, int index);
}
diff --git a/accord-core/src/main/java/accord/utils/IndexedIntFunction.java b/accord-core/src/main/java/accord/utils/IndexedIntFunction.java
index 272c53a..3a1f14b 100644
--- a/accord-core/src/main/java/accord/utils/IndexedIntFunction.java
+++ b/accord-core/src/main/java/accord/utils/IndexedIntFunction.java
@@ -20,5 +20,8 @@
public interface IndexedIntFunction<P1>
{
+ /**
+ * Apply some function to an int parameter and an associated index (usually within a collection).
+ */
int apply(P1 p1, int p2, int index);
}
diff --git a/accord-core/src/main/java/accord/utils/IndexedPredicate.java b/accord-core/src/main/java/accord/utils/IndexedPredicate.java
index 68b555b..7354c52 100644
--- a/accord-core/src/main/java/accord/utils/IndexedPredicate.java
+++ b/accord-core/src/main/java/accord/utils/IndexedPredicate.java
@@ -20,5 +20,8 @@
public interface IndexedPredicate<P1>
{
+ /**
+ * Apply some predicate to an object parameter and an associated index (usually within a collection).
+ */
boolean test(P1 p1, int index);
}
diff --git a/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java b/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java
index 6381e9c..77ad756 100644
--- a/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java
+++ b/accord-core/src/main/java/accord/utils/IndexedRangeFoldToLong.java
@@ -20,5 +20,12 @@
public interface IndexedRangeFoldToLong
{
- long apply(long p1, long p2, int fromIndex, int toIndex);
+ /**
+ * Apply some long->long merge function accepting a constant parameter p1 and the prior output of this
+ * function or an initial value, to some range of indexes referencing some other indexed collection.
+ *
+ * This function is used for folding over a filtered collection, where contiguous ranges are expected
+ * and can be handled batch-wise.
+ */
+ long apply(long p1, long accumulator, int fromIndex, int toIndex);
}
\ No newline at end of file
diff --git a/accord-core/src/main/java/accord/utils/IndexedRangeTriConsumer.java b/accord-core/src/main/java/accord/utils/IndexedRangeTriConsumer.java
index 8fca7eb..c66d58f 100644
--- a/accord-core/src/main/java/accord/utils/IndexedRangeTriConsumer.java
+++ b/accord-core/src/main/java/accord/utils/IndexedRangeTriConsumer.java
@@ -2,5 +2,11 @@
public interface IndexedRangeTriConsumer<P1, P2, P3>
{
+ /**
+ * Consume some object parameters associated with a range of indexes from a collection.
+ *
+ * The first parameter is typically used to convey some container the indexes refer to,
+ * with the others providing other configuration.
+ */
void accept(P1 p1, P2 p2, P3 p3, int fromIndex, int toIndex);
}
diff --git a/accord-core/src/main/java/accord/utils/IndexedTriConsumer.java b/accord-core/src/main/java/accord/utils/IndexedTriConsumer.java
index 4400ff8..9253413 100644
--- a/accord-core/src/main/java/accord/utils/IndexedTriConsumer.java
+++ b/accord-core/src/main/java/accord/utils/IndexedTriConsumer.java
@@ -2,5 +2,11 @@
public interface IndexedTriConsumer<P1, P2, P3>
{
+ /**
+ * Consume some object parameters associated with an index in a collection.
+ *
+ * The first parameter is typically used to convey some container the index refer to,
+ * with the others providing other configuration.
+ */
void accept(P1 p1, P2 p2, P3 p3, int index);
}
diff --git a/accord-core/src/main/java/accord/utils/IndexedTriFunction.java b/accord-core/src/main/java/accord/utils/IndexedTriFunction.java
index 22b2e48..36c3817 100644
--- a/accord-core/src/main/java/accord/utils/IndexedTriFunction.java
+++ b/accord-core/src/main/java/accord/utils/IndexedTriFunction.java
@@ -20,5 +20,11 @@
public interface IndexedTriFunction<P1, P2, P3, V>
{
+ /**
+ * Apply a function to some object parameters associated with an index in a collection.
+ *
+ * The first parameter is typically used to convey some container the index refer to,
+ * with the others providing other configuration.
+ */
V apply(P1 p1, P2 p2, P3 p3, int index);
}
diff --git a/accord-core/src/main/java/accord/utils/Invariants.java b/accord-core/src/main/java/accord/utils/Invariants.java
index de7f2b0..033bcff 100644
--- a/accord-core/src/main/java/accord/utils/Invariants.java
+++ b/accord-core/src/main/java/accord/utils/Invariants.java
@@ -7,11 +7,16 @@
public class Invariants
{
private static final boolean PARANOID = true;
+ private static final boolean DEBUG = true;
public static boolean isParanoid()
{
return PARANOID;
}
+ public static boolean debug()
+ {
+ return DEBUG;
+ }
public static <T1, T2 extends T1> T2 checkType(T1 cast)
{
@@ -57,6 +62,20 @@
return param;
}
+ public static int isNatural(int input)
+ {
+ if (input < 0)
+ throw new IllegalStateException();
+ return input;
+ }
+
+ public static long isNatural(long input)
+ {
+ if (input < 0)
+ throw new IllegalStateException();
+ return input;
+ }
+
public static void checkArgument(boolean condition)
{
if (!condition)
diff --git a/accord-core/src/test/java/accord/KeysTest.java b/accord-core/src/test/java/accord/KeysTest.java
index c8f7650..138de0d 100644
--- a/accord-core/src/test/java/accord/KeysTest.java
+++ b/accord-core/src/test/java/accord/KeysTest.java
@@ -114,11 +114,11 @@
void mergeTest()
{
assertEquals(keys(0, 1, 2, 3, 4),
- keys(0, 1, 2, 3, 4).union(keys(0, 1, 2, 3, 4)));
+ keys(0, 1, 2, 3, 4).with(keys(0, 1, 2, 3, 4)));
assertEquals(keys(0, 1, 2, 3, 4),
- keys(0, 1).union(keys(2, 3, 4)));
+ keys(0, 1).with(keys(2, 3, 4)));
assertEquals(keys(0, 1, 2, 3, 4),
- keys(0, 2, 4).union(keys(1, 3)));
+ keys(0, 2, 4).with(keys(1, 3)));
}
@Test
diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java
index 11366ce..42dab7d 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -88,6 +88,11 @@
return new Txn.InMemory(keys, MockStore.read(keys), MockStore.QUERY, MockStore.update(keys));
}
+ public static Txn writeTxn(Ranges ranges)
+ {
+ return new Txn.InMemory(ranges, MockStore.read(ranges), MockStore.QUERY, MockStore.update(ranges));
+ }
+
public static Txn readTxn(Keys keys)
{
return new Txn.InMemory(keys, MockStore.read(keys), MockStore.QUERY);
diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java
index bc75e62..b458e1d 100644
--- a/accord-core/src/test/java/accord/burn/BurnTest.java
+++ b/accord-core/src/test/java/accord/burn/BurnTest.java
@@ -36,6 +36,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
+import java.util.function.Predicate;
import accord.impl.IntHashKey;
import accord.impl.basic.Cluster;
@@ -51,14 +52,14 @@
import accord.impl.list.ListUpdate;
import accord.local.Node.Id;
import accord.api.Key;
-import accord.primitives.Txn;
-import accord.primitives.Keys;
+import accord.primitives.*;
import accord.verify.StrictSerializabilityVerifier;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static accord.impl.IntHashKey.forHash;
import static accord.utils.Utils.toArray;
public class BurnTest
@@ -79,26 +80,49 @@
Id client = clients.get(random.nextInt(clients.size()));
Id node = nodes.get(random.nextInt(clients.size()));
- int readCount = 1 + random.nextInt(2);
- int writeCount = random.nextInt(3);
-
- TreeSet<Key> requestKeys = new TreeSet<>();
- while (readCount-- > 0)
- requestKeys.add(randomKey(random, keys, requestKeys));
-
- ListUpdate update = new ListUpdate();
- while (writeCount-- > 0)
+ boolean isRangeQuery = random.nextBoolean();
+ if (isRangeQuery)
{
- int i = randomKeyIndex(random, keys, update.keySet());
- update.put(keys.get(i), ++next[i]);
- }
+ int rangeCount = 1 + random.nextInt(2);
+ List<Range> requestRanges = new ArrayList<>();
+ while (--rangeCount >= 0)
+ {
+ int j = 1 + random.nextInt(0xffff), i = Math.max(0, j - (1 + random.nextInt(0x1ffe)));
+ requestRanges.add(IntHashKey.range(forHash(i), forHash(j)));
+ }
+ Ranges ranges = Ranges.of(requestRanges.toArray(new Range[0]));
+ ListRead read = new ListRead(ranges, ranges);
+ ListQuery query = new ListQuery(client, count);
+ ListRequest request = new ListRequest(new Txn.InMemory(ranges, read, query, null));
+ packets.add(new Packet(client, node, count, request));
- Keys readKeys = new Keys(requestKeys);
- requestKeys.addAll(update.keySet());
- ListRead read = new ListRead(readKeys, new Keys(requestKeys));
- ListQuery query = new ListQuery(client, count);
- ListRequest request = new ListRequest(new Txn.InMemory(new Keys(requestKeys), read, query, update));
- packets.add(new Packet(client, node, count, request));
+
+ }
+ else
+ {
+ boolean isWrite = random.nextBoolean();
+ int readCount = 1 + random.nextInt(2);
+ int writeCount = isWrite ? random.nextInt(3) : 0;
+
+ TreeSet<Key> requestKeys = new TreeSet<>();
+ while (readCount-- > 0)
+ requestKeys.add(randomKey(random, keys, requestKeys));
+
+ ListUpdate update = isWrite ? new ListUpdate() : null;
+ while (writeCount-- > 0)
+ {
+ int i = randomKeyIndex(random, keys, update.keySet());
+ update.put(keys.get(i), ++next[i]);
+ }
+
+ Keys readKeys = new Keys(requestKeys);
+ if (isWrite)
+ requestKeys.addAll(update.keySet());
+ ListRead read = new ListRead(readKeys, new Keys(requestKeys));
+ ListQuery query = new ListQuery(client, count);
+ ListRequest request = new ListRequest(new Txn.InMemory(new Keys(requestKeys), read, query, update));
+ packets.add(new Packet(client, node, count, request));
+ }
}
return packets;
@@ -111,8 +135,13 @@
private static int randomKeyIndex(Random random, List<Key> keys, Set<Key> notIn)
{
+ return randomKeyIndex(random, keys, notIn::contains);
+ }
+
+ private static int randomKeyIndex(Random random, List<Key> keys, Predicate<Key> notIn)
+ {
int i;
- while (notIn.contains(keys.get(i = random.nextInt(keys.size()))));
+ while (notIn.test(keys.get(i = random.nextInt(keys.size()))));
return i;
}
@@ -197,14 +226,13 @@
try
{
-
int start = starts[(int)packet.replyId];
int end = clock.incrementAndGet();
logger.debug("{} at [{}, {}]", reply, start, end);
replies[(int)packet.replyId] = packet;
- if (reply.readKeys == null)
+ if (reply.responseKeys == null)
{
if (reply.read == null) nacks.incrementAndGet();
else lost.incrementAndGet();
@@ -216,11 +244,11 @@
for (int i = 0 ; i < reply.read.length ; ++i)
{
- Key key = reply.readKeys.get(i);
+ Key key = reply.responseKeys.get(i);
int k = key(key);
int[] read = reply.read[i];
- int write = reply.update.getOrDefault(key, -1);
+ int write = reply.update == null ? -1 : reply.update.getOrDefault(key, -1);
if (read != null)
strictSerializable.witnessRead(k, read);
@@ -270,7 +298,7 @@
{
// Long overrideSeed = null;
int count = 1;
- Long overrideSeed = 188057951046487786L;
+ Long overrideSeed = 8602265915508619975L;
LongSupplier seedGenerator = ThreadLocalRandom.current()::nextLong;
for (int i = 0 ; i < args.length ; i += 2)
{
diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
index cc8645a..ce19a1d 100644
--- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java
+++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java
@@ -65,7 +65,7 @@
public CommandSync(TxnId txnId, CheckStatusOk status, long srcEpoch, long trgEpoch)
{
Invariants.checkArgument(status.saveStatus.hasBeen(Status.PreAccepted));
- Invariants.checkState(status.route != null);
+ Invariants.checkState(status.route != null && !status.route.isEmpty());
this.txnId = txnId;
this.status = status.saveStatus.status;
this.route = status.route;
diff --git a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
index 08471f6..d7bc992 100644
--- a/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
+++ b/accord-core/src/test/java/accord/coordinate/CoordinateTest.java
@@ -26,10 +26,9 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import static accord.Utils.id;
-import static accord.Utils.ids;
-import static accord.Utils.writeTxn;
+import static accord.Utils.*;
import static accord.impl.IntKey.keys;
+import static accord.impl.IntKey.range;
import static accord.primitives.Routable.Domain.Key;
import static accord.primitives.Txn.Kind.Write;
@@ -51,6 +50,22 @@
Assertions.assertEquals(MockStore.RESULT, result);
}
}
+ @Test
+ void simpleRangeTest() throws Throwable
+ {
+ try (MockCluster cluster = MockCluster.builder().build())
+ {
+ Node node = cluster.get(1);
+ Assertions.assertNotNull(node);
+
+ TxnId txnId = node.nextTxnId(Write, Key);
+ Ranges keys = ranges(range(1, 2));
+ Txn txn = writeTxn(keys);
+ FullRangeRoute route = keys.toRoute(keys.get(0).someIntersectingRoutingKey(null));
+ Result result = Coordinate.coordinate(node, txnId, txn, route).get();
+ Assertions.assertEquals(MockStore.RESULT, result);
+ }
+ }
@Test
void slowPathTest() throws Throwable
diff --git a/accord-core/src/test/java/accord/impl/IntHashKey.java b/accord-core/src/test/java/accord/impl/IntHashKey.java
index 75407f7..5df62d4 100644
--- a/accord-core/src/test/java/accord/impl/IntHashKey.java
+++ b/accord-core/src/test/java/accord/impl/IntHashKey.java
@@ -47,7 +47,7 @@
public accord.primitives.Range subRange(accord.primitives.Range range, Long start, Long end)
{
Invariants.checkArgument(((IntHashKey)range.start()).hash + end.intValue() <= ((IntHashKey)range.end()).hash);
- return range.subRange(
+ return range.newRange(
new Hash(((IntHashKey)range.start()).hash + start.intValue()),
new Hash(((IntHashKey)range.start()).hash + end.intValue())
);
@@ -110,6 +110,12 @@
{
super(Integer.MIN_VALUE, hash);
}
+
+ @Override
+ public accord.primitives.Range asRange()
+ {
+ return new Range(new Hash(hash - 1), new Hash(hash));
+ }
}
public static class Range extends accord.primitives.Range.EndInclusive
@@ -120,7 +126,7 @@
}
@Override
- public accord.primitives.Range subRange(RoutingKey start, RoutingKey end)
+ public accord.primitives.Range newRange(RoutingKey start, RoutingKey end)
{
return new Range((Hash) start, (Hash) end);
}
@@ -140,8 +146,7 @@
{
int subStart = i > 0 ? last : startHash;
int subEnd = i < count - 1 ? subStart + interval : endHash;
- ranges[i] = new Range(new Hash(subStart),
- new Hash(subEnd));
+ ranges[i] = new Range(new Hash(subStart), new Hash(subEnd));
last = subEnd;
}
return Ranges.ofSortedAndDeoverlapped(ranges);
@@ -159,7 +164,7 @@
private IntHashKey(int key, int hash)
{
- assert hash != hash(key);
+ assert key == Integer.MIN_VALUE || hash != hash(key);
this.key = key;
this.hash = hash;
}
@@ -187,16 +192,16 @@
public static accord.primitives.Range[] ranges(int count)
{
List<accord.primitives.Range> result = new ArrayList<>();
- long delta = (Integer.MAX_VALUE - (long)Integer.MIN_VALUE) / count;
- long start = Integer.MIN_VALUE;
+ long delta = 0xffff / count;
+ long start = 0;
Hash prev = new Hash((int)start);
for (int i = 1 ; i < count ; ++i)
{
- Hash next = new Hash((int)Math.min(Integer.MAX_VALUE, start + i * delta));
+ Hash next = new Hash((int)Math.min(0xffff, start + i * delta));
result.add(new Range(prev, next));
prev = next;
}
- result.add(new Range(prev, new Hash(Integer.MAX_VALUE)));
+ result.add(new Range(prev, new Hash(0xffff)));
return toArray(result, accord.primitives.Range[]::new);
}
@@ -234,7 +239,7 @@
crc32c.update(key >> 8);
crc32c.update(key >> 16);
crc32c.update(key >> 24);
- return (int)crc32c.getValue();
+ return (int)crc32c.getValue() & 0xffff;
}
@Override
diff --git a/accord-core/src/test/java/accord/impl/IntKey.java b/accord-core/src/test/java/accord/impl/IntKey.java
index 379b110..a0b4ddd 100644
--- a/accord-core/src/test/java/accord/impl/IntKey.java
+++ b/accord-core/src/test/java/accord/impl/IntKey.java
@@ -47,7 +47,7 @@
public accord.primitives.Range subRange(accord.primitives.Range range, Long start, Long end)
{
Invariants.checkArgument(((IntKey)range.start()).key + end.intValue() <= ((IntKey)range.end()).key);
- return range.subRange(
+ return range.newRange(
routing(((IntKey)range.start()).key + start.intValue()),
routing(((IntKey)range.start()).key + end.intValue())
);
@@ -110,6 +110,12 @@
{
super(key);
}
+
+ @Override
+ public accord.primitives.Range asRange()
+ {
+ return new Range(new Routing(key - 1), new Routing(key));
+ }
}
public static class Range extends accord.primitives.Range.EndInclusive
@@ -120,7 +126,7 @@
}
@Override
- public accord.primitives.Range subRange(RoutingKey start, RoutingKey end)
+ public accord.primitives.Range newRange(RoutingKey start, RoutingKey end)
{
return new Range((Routing)start, (Routing)end);
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListQuery.java b/accord-core/src/test/java/accord/impl/list/ListQuery.java
index 1d86444..281fe01 100644
--- a/accord-core/src/test/java/accord/impl/list/ListQuery.java
+++ b/accord-core/src/test/java/accord/impl/list/ListQuery.java
@@ -27,6 +27,7 @@
import accord.api.Key;
import accord.api.Query;
import accord.api.Result;
+import accord.primitives.Keys;
import accord.primitives.TxnId;
public class ListQuery implements Query
@@ -44,13 +45,14 @@
public Result compute(TxnId txnId, Data data, Read untypedRead, Update update)
{
ListRead read = (ListRead) untypedRead;
- int[][] values = new int[read.readKeys.size()][];
+ Keys responseKeys = Keys.ofSorted(((ListData)data).keySet());
+ int[][] values = new int[responseKeys.size()][];
for (Map.Entry<Key, int[]> e : ((ListData)data).entrySet())
{
- int i = read.readKeys.indexOf(e.getKey());
+ int i = responseKeys.indexOf(e.getKey());
if (i >= 0)
values[i] = e.getValue();
}
- return new ListResult(client, requestId, txnId, read.readKeys, values, (ListUpdate) update);
+ return new ListResult(client, requestId, txnId, read.readKeys, responseKeys, values, (ListUpdate) update);
}
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListRead.java b/accord-core/src/test/java/accord/impl/list/ListRead.java
index 6136117..3e7bb82 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRead.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRead.java
@@ -20,42 +20,50 @@
import accord.api.*;
import accord.local.SafeCommandStore;
-import accord.primitives.Ranges;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
+import accord.primitives.*;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
public class ListRead implements Read
{
private static final Logger logger = LoggerFactory.getLogger(ListRead.class);
- public final Keys readKeys;
- public final Keys keys;
+ public final Seekables<?, ?> readKeys;
+ public final Seekables<?, ?> keys;
- public ListRead(Keys readKeys, Keys keys)
+ public ListRead(Seekables<?, ?> readKeys, Seekables<?, ?> keys)
{
this.readKeys = readKeys;
this.keys = keys;
}
@Override
- public Keys keys()
+ public Seekables keys()
{
return keys;
}
@Override
- public Future<Data> read(Key key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
+ public Future<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
{
ListStore s = (ListStore)store;
ListData result = new ListData();
- int[] data = s.get(key);
- logger.trace("READ on {} at {} key:{} -> {}", s.node, executeAt, key, data);
- result.put(key, data);
+ switch (key.domain())
+ {
+ default: throw new AssertionError();
+ case Key:
+ int[] data = s.get((Key)key);
+ logger.trace("READ on {} at {} key:{} -> {}", s.node, executeAt, key, data);
+ result.put((Key)key, data);
+ break;
+ case Range:
+ for (Map.Entry<Key, int[]> e : s.get((Range)key))
+ result.put(e.getKey(), e.getValue());
+ }
return ImmediateFuture.success(result);
}
@@ -68,7 +76,7 @@
@Override
public Read merge(Read other)
{
- return new ListRead(readKeys.union(((ListRead)other).readKeys), keys.union(((ListRead)other).keys));
+ return new ListRead(((Seekables)readKeys).with(((ListRead)other).readKeys), ((Seekables)keys).with(((ListRead)other).keys));
}
@Override
diff --git a/accord-core/src/test/java/accord/impl/list/ListRequest.java b/accord-core/src/test/java/accord/impl/list/ListRequest.java
index cdef9b0..f2a9883 100644
--- a/accord-core/src/test/java/accord/impl/list/ListRequest.java
+++ b/accord-core/src/test/java/accord/impl/list/ListRequest.java
@@ -117,10 +117,10 @@
switch (s)
{
case Invalidated:
- node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, null));
+ node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, null, null));
break;
case Lost:
- node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, new int[0][], null));
+ node.reply(client, replyContext, new ListResult(client, ((Packet)replyContext).requestId, txnId, null, null, new int[0][], null));
break;
case Neither:
// currently caught elsewhere in response tracking, but might help to throw an exception here
diff --git a/accord-core/src/test/java/accord/impl/list/ListResult.java b/accord-core/src/test/java/accord/impl/list/ListResult.java
index 12fb4a4..a203d90 100644
--- a/accord-core/src/test/java/accord/impl/list/ListResult.java
+++ b/accord-core/src/test/java/accord/impl/list/ListResult.java
@@ -27,6 +27,7 @@
import accord.messages.MessageType;
import accord.primitives.Keys;
import accord.messages.Reply;
+import accord.primitives.Seekables;
import accord.primitives.TxnId;
public class ListResult implements Result, Reply
@@ -34,16 +35,18 @@
public final Id client;
public final long requestId;
public final TxnId txnId;
- public final Keys readKeys;
+ public final Seekables<?, ?> readKeys;
+ public final Keys responseKeys;
public final int[][] read; // equal in size to keys.size()
public final ListUpdate update;
- public ListResult(Id client, long requestId, TxnId txnId, Keys readKeys, int[][] read, ListUpdate update)
+ public ListResult(Id client, long requestId, TxnId txnId, Seekables<?, ?> readKeys, Keys responseKeys, int[][] read, ListUpdate update)
{
this.client = client;
this.requestId = requestId;
this.txnId = txnId;
this.readKeys = readKeys;
+ this.responseKeys = responseKeys;
this.read = read;
this.update = update;
}
@@ -60,10 +63,10 @@
return "{client:" + client + ", "
+ "requestId:" + requestId + ", "
+ "txnId:" + txnId + ", "
- + (readKeys == null
+ + (responseKeys == null
? "invalidated!}"
- : "reads:" + IntStream.range(0, readKeys.size())
- .mapToObj(i -> readKeys.get(i) + ":" + Arrays.toString(read[i]))
+ : "reads:" + IntStream.range(0, responseKeys.size())
+ .mapToObj(i -> responseKeys.get(i) + ":" + Arrays.toString(read[i]))
.collect(Collectors.joining(", ", "{", "}")) + ", "
+ "writes:" + update + "}");
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java b/accord-core/src/test/java/accord/impl/list/ListStore.java
index 012848b..cf79d13 100644
--- a/accord-core/src/test/java/accord/impl/list/ListStore.java
+++ b/accord-core/src/test/java/accord/impl/list/ListStore.java
@@ -18,18 +18,24 @@
package accord.impl.list;
-import java.util.Map;
+import java.util.*;
+import java.util.AbstractMap.SimpleEntry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import accord.api.Key;
import accord.local.Node;
import accord.api.DataStore;
+import accord.primitives.Range;
+import accord.primitives.RoutableKey;
+import accord.primitives.Seekable;
+import accord.primitives.Timestamp;
import accord.utils.Timestamped;
public class ListStore implements DataStore
{
static final int[] EMPTY = new int[0];
- final Map<Key, Timestamped<int[]>> data = new ConcurrentHashMap<>();
+ final NavigableMap<RoutableKey, Timestamped<int[]>> data = new TreeMap<>();
// adding here to help trace burn test queries
public final Node.Id node;
@@ -44,4 +50,17 @@
Timestamped<int[]> v = data.get(key);
return v == null ? EMPTY : v.data;
}
+
+ public List<Map.Entry<Key, int[]>> get(Range range)
+ {
+ return data.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive())
+ .entrySet()
+ .stream().map(e -> new SimpleEntry<>((Key)e.getKey(), e.getValue().data))
+ .collect(Collectors.toList());
+ }
+
+ public synchronized void write(Key key, Timestamp executeAt, int[] value)
+ {
+ data.merge(key, new Timestamped<>(executeAt, value), Timestamped::merge);
+ }
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListUpdate.java b/accord-core/src/test/java/accord/impl/list/ListUpdate.java
index e734724..055d1ea 100644
--- a/accord-core/src/test/java/accord/impl/list/ListUpdate.java
+++ b/accord-core/src/test/java/accord/impl/list/ListUpdate.java
@@ -28,11 +28,12 @@
import accord.api.Update;
import accord.primitives.Ranges;
import accord.primitives.Keys;
+import accord.primitives.Seekables;
public class ListUpdate extends TreeMap<Key, Integer> implements Update
{
@Override
- public Keys keys()
+ public Seekables<?, ?> keys()
{
return new Keys(navigableKeySet());
}
diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java b/accord-core/src/test/java/accord/impl/list/ListWrite.java
index 6a7d7df..b09e39d 100644
--- a/accord-core/src/test/java/accord/impl/list/ListWrite.java
+++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java
@@ -26,6 +26,7 @@
import accord.api.DataStore;
import accord.api.Write;
import accord.local.SafeCommandStore;
+import accord.primitives.Seekable;
import accord.primitives.Timestamp;
import accord.primitives.Writes;
import accord.utils.Timestamped;
@@ -38,13 +39,13 @@
private static final Logger logger = LoggerFactory.getLogger(ListWrite.class);
@Override
- public Future<Void> apply(Key key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
+ public Future<Void> apply(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
{
ListStore s = (ListStore) store;
if (!containsKey(key))
return Writes.SUCCESS;
int[] data = get(key);
- s.data.merge(key, new Timestamped<>(executeAt, data), Timestamped::merge);
+ s.data.merge((Key)key, new Timestamped<>(executeAt, data), Timestamped::merge);
logger.trace("WRITE on {} at {} key:{} -> {}", s.node, executeAt, key, data);
return Writes.SUCCESS;
}
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index c6f70c2..90613ff 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -19,6 +19,7 @@
package accord.impl.mock;
import accord.NetworkFilter;
+import accord.api.Key;
import accord.api.MessageSink;
import accord.coordinate.Timeout;
import accord.impl.*;
@@ -26,8 +27,6 @@
import accord.local.Node.Id;
import accord.local.ShardDistributor;
import accord.primitives.Ranges;
-import accord.primitives.Routable;
-import accord.primitives.Txn;
import accord.utils.EpochFunction;
import accord.utils.ThreadPoolScheduler;
import accord.primitives.TxnId;
diff --git a/accord-core/src/test/java/accord/impl/mock/MockStore.java b/accord-core/src/test/java/accord/impl/mock/MockStore.java
index ba5cafd..d74ca79 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockStore.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockStore.java
@@ -19,7 +19,6 @@
package accord.impl.mock;
import accord.api.Data;
-import accord.api.Key;
import accord.api.Query;
import accord.api.Read;
import accord.api.Result;
@@ -27,10 +26,7 @@
import accord.api.Update;
import accord.api.Write;
import accord.local.SafeCommandStore;
-import accord.primitives.Ranges;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
+import accord.primitives.*;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
@@ -48,18 +44,18 @@
public static final Query QUERY = (txnId, data, read, update) -> RESULT;
public static final Write WRITE = (key, commandStore, executeAt, store) -> ImmediateFuture.success(null);
- public static Read read(Keys keys)
+ public static Read read(Seekables<?, ?> keys)
{
return new Read()
{
@Override
- public Keys keys()
+ public Seekables<?, ?> keys()
{
return keys;
}
@Override
- public Future<Data> read(Key key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
+ public Future<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
{
return ImmediateFuture.success(DATA);
}
@@ -73,7 +69,7 @@
@Override
public Read merge(Read other)
{
- return MockStore.read(keys.union(other.keys()));
+ return MockStore.read(((Seekables)keys).with(other.keys()));
}
@Override
@@ -84,12 +80,12 @@
};
}
- public static Update update(Keys keys)
+ public static Update update(Seekables<?, ?> keys)
{
return new Update()
{
@Override
- public Keys keys()
+ public Seekables<?, ?> keys()
{
return keys;
}
@@ -109,7 +105,7 @@
@Override
public Update merge(Update other)
{
- return MockStore.update(keys.union(other.keys()));
+ return MockStore.update(((Seekables)keys).with(other.keys()));
}
@Override
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
index 6619555..d041679 100644
--- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -56,6 +56,7 @@
private static final Id ID3 = id(3);
private static final List<Id> IDS = listOf(ID1, ID2, ID3);
private static final Topology TOPOLOGY = TopologyFactory.toTopology(IDS, 3, IntKey.range(0, 100));
+ private static final Ranges RANGE = Ranges.single(IntKey.range(0, 100));
private static final Ranges FULL_RANGE = Ranges.single(IntKey.range(routing(Integer.MIN_VALUE), routing(Integer.MAX_VALUE)));
private static final ReplyContext REPLY_CONTEXT = Network.replyCtxFor(0);
@@ -104,7 +105,7 @@
clock.increment(10);
preAccept.process(node, ID2, REPLY_CONTEXT);
- Command command = ((InMemoryCommandTimeseries<?>)inMemory(commandStore).commandsForKey(key).uncommitted()).all().findFirst().get();
+ Command command = ((InMemoryCommandTimeseries)inMemory(commandStore).commandsForKey(key).byId()).all().findFirst().get();
Assertions.assertEquals(Status.PreAccepted, command.status());
messageSink.assertHistorySizes(0, 1);
@@ -233,7 +234,7 @@
clock.increment(10);
preAccept.process(node, ID2, REPLY_CONTEXT);
- Command command = ((InMemoryCommandTimeseries<?>)inMemory(commandStore).commandsForKey(key).uncommitted()).all().findFirst().get();
+ Command command = ((InMemoryCommandTimeseries)inMemory(commandStore).commandsForKey(key).byId()).all().findFirst().get();
Assertions.assertEquals(Status.PreAccepted, command.status());
messageSink.assertHistorySizes(0, 1);
diff --git a/accord-core/src/test/java/accord/primitives/KeyDepsTest.java b/accord-core/src/test/java/accord/primitives/KeyDepsTest.java
index c490041..53e566f 100644
--- a/accord-core/src/test/java/accord/primitives/KeyDepsTest.java
+++ b/accord-core/src/test/java/accord/primitives/KeyDepsTest.java
@@ -509,7 +509,7 @@
if (ranges.contains(key))
deps.canonical.get(key).forEach(txnId -> canonical.add(new Entry<>(key, txnId)));
}
- deps.test.forEach(ranges, new BiConsumer<>()
+ deps.test.forEach(ranges, new BiConsumer<Key, TxnId>()
{
int i = 0;
@Override
diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
index 7942979..371c9ef 100644
--- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
+++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java
@@ -246,7 +246,7 @@
{
Ranges previous = previouslyReplicated.getOrDefault(entry.getKey(), Ranges.EMPTY);
Ranges added = entry.getValue();
- Ranges merged = previous.union(added).mergeTouching();
+ Ranges merged = previous.with(added).mergeTouching();
previouslyReplicated.put(entry.getKey(), merged);
}
diff --git a/accord-core/src/test/java/accord/utils/RangesTest.java b/accord-core/src/test/java/accord/utils/RangesTest.java
index bb2e4fe..9152363 100644
--- a/accord-core/src/test/java/accord/utils/RangesTest.java
+++ b/accord-core/src/test/java/accord/utils/RangesTest.java
@@ -79,13 +79,13 @@
void addTest()
{
Assertions.assertEquals(ranges(r(0, 50), r(50, 100), r(100, 150), r(150, 200)),
- ranges(r(0, 50), r(100, 150)).union(ranges(r(50, 100), r(150, 200))));
+ ranges(r(0, 50), r(100, 150)).with(ranges(r(50, 100), r(150, 200))));
}
private static void assertMergeResult(Ranges expected, Ranges input1, Ranges input2)
{
- Assertions.assertEquals(expected, input1.union(input2));
- Assertions.assertEquals(expected, input2.union(input1));
+ Assertions.assertEquals(expected, input1.with(input2));
+ Assertions.assertEquals(expected, input2.with(input1));
}
@Test
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Datum.java b/accord-maelstrom/src/main/java/accord/maelstrom/Datum.java
index bfd5311..696f7d6 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Datum.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Datum.java
@@ -60,50 +60,14 @@
throw new IllegalArgumentException();
MaelstromKey.Routing[] result = new MaelstromKey.Routing[count];
- switch (this)
- {
- case STRING:
- {
- // use only alphanumeric values to compute ranges
- long range = 63 * 63 * 63 * 63 + 1;
- long delta = range / count;
- for (int i = 0 ; i < count ; ++i)
- result[i] = new MaelstromKey.Routing(this, toString(i * delta));
- break;
- }
- case DOUBLE:
- {
- result[0] = new MaelstromKey.Routing(Double.NEGATIVE_INFINITY);
- if (count == 2)
- {
- result[1] = new MaelstromKey.Routing(0d);
- }
- else
- {
- double delta = Double.MAX_VALUE * (2d / count);
- double cur = -Double.MAX_VALUE;
- for (int i = 1 ; i < count ; ++i)
- result[i] = new MaelstromKey.Routing(cur += delta);
- }
- break;
- }
- case LONG:
- {
- long delta = 2 * (Long.MAX_VALUE / count);
- long start = Long.MIN_VALUE;
- for (int i = 0 ; i < count ; ++i)
- result[i] = new MaelstromKey.Routing(this, start + i * delta);
- break;
- }
- case HASH:
- {
- int delta = 2 * (Integer.MAX_VALUE / count);
- int start = Integer.MIN_VALUE;
- for (int i = 0 ; i < count ; ++i)
- result[i] = new MaelstromKey.Routing(this, new Hash(start + i * delta));
- break;
- }
- }
+ if (this != Kind.HASH)
+ throw new UnsupportedOperationException();
+
+ int delta = 2 * (Integer.MAX_VALUE / count);
+ int start = Integer.MIN_VALUE;
+ for (int i = 0; i < count; ++i)
+ result[i] = new MaelstromKey.Routing(start + i * delta);
+
return result;
}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
index ba08474..958f895 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/Json.java
@@ -387,7 +387,7 @@
out.name("executeAt");
GSON.toJson(value.executeAt, Timestamp.class, out);
out.name("keys");
- Keys keys = value.keys;
+ Keys keys = (Keys) value.keys;
KEYS_ADAPTER.write(out, keys);
out.name("writes");
MaelstromWrite write = (MaelstromWrite) value.write;
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
index 057b335..b42af64 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromKey.java
@@ -55,9 +55,9 @@
Invariants.checkState(end - start <= Integer.MAX_VALUE);
long startHash = hash(range.start());
Invariants.checkArgument(startHash + end <= hash(range.end()));
- return range.subRange(
- new Routing(Datum.Kind.HASH, new Datum.Hash((int) (startHash + start))),
- new Routing(Datum.Kind.HASH, new Datum.Hash((int) (startHash + end)))
+ return range.newRange(
+ new Routing((int) (startHash + start)),
+ new Routing((int) (startHash + end))
);
}
@@ -119,14 +119,22 @@
public static class Routing extends MaelstromKey implements accord.api.RoutingKey
{
- public Routing(Datum.Kind kind, Object value)
+ public Routing(Datum.Kind kind, Object hash)
{
- super(kind, value);
+ super(kind, hash);
+ Invariants.checkArgument(kind == Datum.Kind.HASH);
}
- public Routing(Double value)
+ public Routing(int hash)
{
- super(value);
+ super(new Datum(new Datum.Hash(hash)));
+ }
+
+ @Override
+ public accord.primitives.Range asRange()
+ {
+ return new Range(new Routing(datum.hashCode() - 1),
+ new Routing(datum.hashCode()));
}
}
@@ -138,7 +146,7 @@
}
@Override
- public accord.primitives.Range subRange(RoutingKey start, RoutingKey end)
+ public accord.primitives.Range newRange(RoutingKey start, RoutingKey end)
{
return new Range(start, end);
}
@@ -156,6 +164,11 @@
datum = new Datum(value);
}
+ MaelstromKey(Datum value)
+ {
+ datum = value;
+ }
+
@Override
public int compareTo(@Nonnull RoutableKey that)
{
@@ -207,7 +220,7 @@
{
if (this instanceof Routing)
return (Routing)this;
- return new Routing(datum.kind, datum.value);
+ return new Routing(datum.value.hashCode());
}
@Override
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
index 9e248db..e2b808f 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromRead.java
@@ -20,10 +20,7 @@
import accord.api.*;
import accord.local.SafeCommandStore;
-import accord.primitives.Ranges;
-import accord.primitives.Keys;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
+import accord.primitives.*;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
@@ -45,11 +42,11 @@
}
@Override
- public Future<Data> read(Key key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
+ public Future<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
{
MaelstromStore s = (MaelstromStore)store;
MaelstromData result = new MaelstromData();
- result.put(key, s.get(key));
+ result.put((Key)key, s.get((Key)key));
return ImmediateFuture.success(result);
}
@@ -63,12 +60,12 @@
public Read merge(Read other)
{
MaelstromRead that = (MaelstromRead) other;
- Keys readKeys = this.readKeys.union(that.readKeys);
- Keys keys = this.keys.union(that.keys);
+ Keys readKeys = this.readKeys.with(that.readKeys);
+ Keys keys = this.keys.with(that.keys);
if (readKeys == this.readKeys && keys == this.keys)
return this;
if (readKeys == that.readKeys && keys == that.keys)
return that;
- return new MaelstromRead(readKeys.union(that.readKeys), keys.union(that.keys));
+ return new MaelstromRead(readKeys.with(that.readKeys), keys.with(that.keys));
}
}
diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
index f572a46..68996b0 100644
--- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
+++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromWrite.java
@@ -22,6 +22,7 @@
import accord.api.DataStore;
import accord.api.Write;
import accord.local.SafeCommandStore;
+import accord.primitives.Seekable;
import accord.primitives.Timestamp;
import accord.primitives.Writes;
import accord.utils.Timestamped;
@@ -32,11 +33,11 @@
public class MaelstromWrite extends TreeMap<Key, Value> implements Write
{
@Override
- public Future<Void> apply(Key key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
+ public Future<Void> apply(Seekable key, SafeCommandStore commandStore, Timestamp executeAt, DataStore store)
{
MaelstromStore s = (MaelstromStore) store;
if (containsKey(key))
- s.data.merge(key, new Timestamped<>(executeAt, get(key)), Timestamped::merge);
+ s.data.merge((Key)key, new Timestamped<>(executeAt, get(key)), Timestamped::merge);
return Writes.SUCCESS;
}
}