Serialization improvements
patch by Benedict; reviewed by David Capwell for CASSANDRA-20578
diff --git a/accord-core/src/main/java/accord/api/Sliceable.java b/accord-core/src/main/java/accord/api/Sliceable.java
new file mode 100644
index 0000000..60c0e83
--- /dev/null
+++ b/accord-core/src/main/java/accord/api/Sliceable.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.api;
+
+import accord.primitives.Participants;
+import accord.primitives.Ranges;
+
+public interface Sliceable<T extends Sliceable<T>>
+{
+ T slice(Ranges ranges);
+ T intersecting(Participants<?> participants);
+ T merge(T merge);
+}
diff --git a/accord-core/src/main/java/accord/api/Update.java b/accord-core/src/main/java/accord/api/Update.java
index e9b6f87..d28282e 100644
--- a/accord-core/src/main/java/accord/api/Update.java
+++ b/accord-core/src/main/java/accord/api/Update.java
@@ -30,7 +30,7 @@
* Takes as input the data returned by {@code Read}, and returns a {@code Write}
* representing new information to distributed to each shard's stores.
*/
-public interface Update
+public interface Update extends Sliceable<Update>
{
Seekables<?, ?> keys();
// null is provided only if nothing was read
diff --git a/accord-core/src/main/java/accord/primitives/AbstractKeys.java b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
index 376ade8..2dee5b8 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractKeys.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractKeys.java
@@ -60,13 +60,15 @@
return Arrays.equals(keys, that.keys);
}
- public int indexOf(K key)
+ public final int indexOf(K key)
{
- return Arrays.binarySearch(keys, key);
- }
-
- public int indexOf(RoutingKey key)
- {
+ if (keys.length == 1)
+ {
+ int c = key.compareTo(keys[0]);
+ if (c == 0) return 0;
+ else if (c < 0) return -1;
+ else return -2;
+ }
return Arrays.binarySearch(keys, key);
}
diff --git a/accord-core/src/main/java/accord/primitives/AbstractRanges.java b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
index fa610d5..266b434 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractRanges.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractRanges.java
@@ -205,6 +205,12 @@
}
// returns ki in bottom 32 bits, ri in top, or -1 if no match found
+ public final long findNextExactIntersection(int thisi, AbstractRanges that, int thati)
+ {
+ return SortedArrays.findNextIntersection(ranges, thisi, that.ranges, thati, Range::compare);
+ }
+
+ // returns ki in bottom 32 bits, ri in top, or -1 if no match found
public final long findNextSameKindIntersection(int thisi, Routables<Range> that, int thati)
{
return findNextIntersection(thisi, (AbstractRanges) that, thati);
@@ -841,6 +847,12 @@
}
@Inline
+ public final long foldlExact(AbstractRanges intersect, IndexedFoldToLong<Range> fold, long param, long accumulator, long terminalValue)
+ {
+ return Routables.foldlExact(this, intersect, fold, param, accumulator, terminalValue);
+ }
+
+ @Inline
public final <P1, P2, V> V foldl(AbstractRanges intersect, IndexedTriFold<P1, P2, Range, V> fold, P1 p1, P2 p2, V accumulator)
{
return Routables.foldl(this, intersect, fold, p1, p2, accumulator, i -> false);
diff --git a/accord-core/src/main/java/accord/primitives/AbstractUnseekableKeys.java b/accord-core/src/main/java/accord/primitives/AbstractUnseekableKeys.java
index 48d5526..0f68707 100644
--- a/accord-core/src/main/java/accord/primitives/AbstractUnseekableKeys.java
+++ b/accord-core/src/main/java/accord/primitives/AbstractUnseekableKeys.java
@@ -43,12 +43,6 @@
}
@Override
- public final int indexOf(RoutingKey key)
- {
- return Arrays.binarySearch(keys, key);
- }
-
- @Override
public final boolean contains(RoutingKey key)
{
return Arrays.binarySearch(keys, key) >= 0;
diff --git a/accord-core/src/main/java/accord/primitives/Keys.java b/accord-core/src/main/java/accord/primitives/Keys.java
index 2b57254..e73030b 100644
--- a/accord-core/src/main/java/accord/primitives/Keys.java
+++ b/accord-core/src/main/java/accord/primitives/Keys.java
@@ -71,15 +71,6 @@
}
@Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- Keys keys1 = (Keys) o;
- return Arrays.equals(keys, keys1.keys);
- }
-
- @Override
public Keys with(Keys that)
{
return wrap(SortedArrays.linearUnion(keys, that.keys, cachedKeys()), that);
diff --git a/accord-core/src/main/java/accord/primitives/PartialTxn.java b/accord-core/src/main/java/accord/primitives/PartialTxn.java
index f23cbbd..3d88ed2 100644
--- a/accord-core/src/main/java/accord/primitives/PartialTxn.java
+++ b/accord-core/src/main/java/accord/primitives/PartialTxn.java
@@ -20,7 +20,9 @@
import accord.api.Query;
import accord.api.Read;
+import accord.api.Sliceable;
import accord.api.Update;
+import accord.utils.Invariants;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -66,6 +68,11 @@
super(kind, keys, read, query, update);
}
+ public InMemory(Kind kind, Seekables<?, ?> keys, Read read, Query query, Update update, Sliceable implementationDefined)
+ {
+ super(kind, keys, read, query, update, implementationDefined);
+ }
+
@Override
public PartialTxn with(PartialTxn add)
{
@@ -76,17 +83,24 @@
Read read = this.read().merge(add.read());
Query query = this.query() == null ? add.query() : this.query();
Update update = this.update() == null ? null : this.update().merge(add.update());
+ Sliceable implementationDefined = null;
+ if (this.implementationDefined != null)
+ {
+ Invariants.require(add instanceof Txn.InMemory);
+ implementationDefined = this.implementationDefined.merge(((Txn.InMemory)add).implementationDefined);
+ }
+
if (keys == this.keys())
{
- if (read == this.read() && query == this.query() && update == this.update())
+ if (read == this.read() && query == this.query() && update == this.update() && implementationDefined == this.implementationDefined)
return this;
}
else if (keys == add.keys())
{
- if (read == add.read() && query == add.query() && update == add.update())
+ if (read == add.read() && query == add.query() && update == add.update() && (implementationDefined == null || implementationDefined == ((Txn.InMemory)add).implementationDefined))
return add;
}
- return new PartialTxn.InMemory(kind(), keys, read, query, update);
+ return new PartialTxn.InMemory(kind(), keys, read, query, update, implementationDefined);
}
@Override
@@ -95,7 +109,7 @@
if (!kind().isSystemTxn() && !covers(route) || query() == null)
throw illegalState("Incomplete PartialTxn: " + this + ", route: " + route);
- return new Txn.InMemory(kind(), keys(), read(), query(), update());
+ return new Txn.InMemory(kind(), keys(), read(), query(), update(), implementationDefined);
}
@Override
@@ -107,7 +121,7 @@
if (this.keys().containsAll(covering))
return this;
- return new PartialTxn.InMemory(kind(), keys(), read(), query(), update());
+ return new PartialTxn.InMemory(kind(), keys(), read(), query(), update(), implementationDefined);
}
@Nonnull
@@ -122,7 +136,8 @@
return new PartialTxn.InMemory(
kind(), intersecting,
read().intersecting(participants), includeQuery ? query() : null,
- update() == null ? null : update().intersecting(participants)
+ update() == null ? null : update().intersecting(participants),
+ implementationDefined == null ? null : implementationDefined.intersecting(participants)
);
}
diff --git a/accord-core/src/main/java/accord/primitives/Routables.java b/accord-core/src/main/java/accord/primitives/Routables.java
index cf08d5f..09c66a1 100644
--- a/accord-core/src/main/java/accord/primitives/Routables.java
+++ b/accord-core/src/main/java/accord/primitives/Routables.java
@@ -265,6 +265,16 @@
}
/**
+ * Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order.
+ * Terminate once we hit {@code terminalValue}.
+ */
+ @Inline
+ static long foldlExact(AbstractRanges inputs, AbstractRanges matching, IndexedFoldToLong<? super Range> fold, long param, long initialValue, long terminalValue)
+ {
+ return Helper.foldl(AbstractRanges::findNextExactIntersection, Helper::findLimit, inputs, matching, fold, param, initialValue, terminalValue);
+ }
+
+ /**
* Fold-left over the {@code inputs} that intersect with {@code matching} in ascending order, passing the contiguous ranges that intersect to the IndexedRangeFold function.
* Terminate once we hit {@code terminalValue}.
*/
diff --git a/accord-core/src/main/java/accord/primitives/Txn.java b/accord-core/src/main/java/accord/primitives/Txn.java
index df9ee06..b52298b 100644
--- a/accord-core/src/main/java/accord/primitives/Txn.java
+++ b/accord-core/src/main/java/accord/primitives/Txn.java
@@ -30,6 +30,7 @@
import accord.api.Query;
import accord.api.Read;
import accord.api.Result;
+import accord.api.Sliceable;
import accord.api.Update;
import accord.local.SafeCommandStore;
import accord.utils.Invariants;
@@ -302,24 +303,42 @@
private final Read read;
private final Query query;
private final Update update;
+ // TODO (desired): maybe introduce a C* Txn object instead of stashing this here
+ public final Sliceable implementationDefined;
public InMemory(@Nonnull Seekables<?, ?> keys, @Nonnull Read read, @Nonnull Query query)
{
- this(Kind.Read, keys, read, query, null);
+ this(Kind.Read, keys, read, query, null, null);
+ }
+
+ public InMemory(@Nonnull Seekables<?, ?> keys, @Nonnull Read read, @Nonnull Query query, Sliceable implementationDefined)
+ {
+ this(Kind.Read, keys, read, query, null, implementationDefined);
}
public InMemory(@Nonnull Seekables<?, ?> keys, @Nonnull Read read, @Nonnull Query query, @Nullable Update update)
{
- this(Kind.Write, keys, read, query, update);
+ this(Kind.Write, keys, read, query, update, null);
+ }
+
+ public InMemory(@Nonnull Seekables<?, ?> keys, @Nonnull Read read, @Nonnull Query query, @Nullable Update update, Sliceable implementationDefined)
+ {
+ this(Kind.Write, keys, read, query, update, implementationDefined);
}
public InMemory(@Nonnull Kind kind, @Nonnull Seekables<?, ?> keys, @Nonnull Read read, @Nullable Query query, @Nullable Update update)
{
+ this(kind, keys, read, query, update, null);
+ }
+
+ public InMemory(@Nonnull Kind kind, @Nonnull Seekables<?, ?> keys, @Nonnull Read read, @Nullable Query query, @Nullable Update update, Sliceable implementationDefined)
+ {
this.kind = kind;
this.keys = keys;
this.read = read;
this.update = update;
this.query = query;
+ this.implementationDefined = implementationDefined;
Invariants.require(kind != Kind.ExclusiveSyncPoint || keys.domain() == Routable.Domain.Range);
}
@@ -329,7 +348,8 @@
return new PartialTxn.InMemory(
kind(), keys().slice(ranges, Minimal),
read().slice(ranges), includeQuery ? query() : null,
- update() == null ? null : update().slice(ranges)
+ update() == null ? null : update().slice(ranges),
+ implementationDefined == null ? null : implementationDefined.slice(ranges)
);
}
@@ -339,8 +359,10 @@
{
return new PartialTxn.InMemory(
kind(), keys().intersecting(participants, Minimal),
- read().intersecting(participants), includeQuery ? query() : null,
- update() == null ? null : update().intersecting(participants)
+ read.intersecting(participants),
+ includeQuery ? query() : null,
+ update == null ? null : update.intersecting(participants),
+ implementationDefined == null ? null : implementationDefined.intersecting(participants)
);
}