blob: 9da07232ca52664b2d9acc22a02cdc2df961b87c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.local;
import java.util.function.BiFunction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import accord.api.RoutingKey;
import accord.local.Status.Durability;
import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.TxnId;
import accord.primitives.Unseekables;
import accord.utils.Invariants;
import accord.utils.ReducingIntervalMap;
import accord.utils.ReducingRangeMap;
import static accord.local.Status.Durability.MajorityOrInvalidated;
import static accord.local.Status.Durability.NotDurable;
import static accord.local.Status.Durability.UniversalOrInvalidated;
public class DurableBefore extends ReducingRangeMap<DurableBefore.Entry>
{
public static class SerializerSupport
{
public static DurableBefore create(boolean inclusiveEnds, RoutingKey[] ends, Entry[] values)
{
return new DurableBefore(inclusiveEnds, ends, values);
}
}
public static class Entry
{
public final @Nonnull TxnId majorityBefore, universalBefore;
public Entry(@Nonnull TxnId majority, @Nonnull TxnId universalBefore)
{
Invariants.checkArgument(majority.compareTo(universalBefore) >= 0, "majority %s < universal %s", majority, universalBefore);
this.majorityBefore = majority;
this.universalBefore = universalBefore;
}
private static Entry max(Entry a, Entry b)
{
return reduce(a, b, TxnId::max);
}
private static Entry min(Entry a, Entry b)
{
return reduce(a, b, TxnId::min);
}
private static Entry reduce(Entry a, Entry b, BiFunction<TxnId, TxnId, TxnId> reduce)
{
TxnId majority = reduce.apply(a.majorityBefore, b.majorityBefore);
TxnId universal = reduce.apply(a.universalBefore, b.universalBefore);
if (majority == a.majorityBefore && universal == a.universalBefore)
return a;
if (majority.equals(b.majorityBefore) && universal.equals(b.universalBefore))
return b;
return new Entry(majority, universal);
}
public Durability get(TxnId txnId)
{
if (txnId.compareTo(majorityBefore) < 0)
return txnId.compareTo(universalBefore) < 0 ? UniversalOrInvalidated : MajorityOrInvalidated;
return NotDurable;
}
static Durability mergeMin(Entry entry, @Nullable Durability prev, TxnId txnId)
{
Durability next = entry.get(txnId);
return prev != null && prev.compareTo(next) <= 0 ? prev : next;
}
static Durability mergeMax(Entry entry, Durability prev, TxnId txnId)
{
Durability next = entry.get(txnId);
return prev != null && prev.compareTo(next) >= 0 ? prev : next;
}
public boolean equals(Object that)
{
return that instanceof Entry && equals((Entry) that);
}
public boolean equals(Entry that)
{
return this.majorityBefore.equals(that.majorityBefore)
&& this.universalBefore.equals(that.universalBefore);
}
@Override
public String toString()
{
return "(" + majorityBefore + "," + universalBefore + ")";
}
}
public static final DurableBefore EMPTY = new DurableBefore();
final Entry min;
private DurableBefore()
{
this.min = new Entry(TxnId.NONE, TxnId.NONE);
}
DurableBefore(boolean inclusiveEnds, RoutingKey[] starts, Entry[] values)
{
super(inclusiveEnds, starts, values);
if (values.length == 0)
{
min = new Entry(TxnId.NONE, TxnId.NONE);
}
else
{
Entry min = null;
for (Entry value : values)
{
if (value == null)
continue;
if (min == null) min = value;
else min = Entry.min(min, value);
}
this.min = min;
}
}
public static DurableBefore create(Ranges ranges, @Nonnull TxnId majority, @Nonnull TxnId universal)
{
if (ranges.isEmpty())
return DurableBefore.EMPTY;
Entry entry = new Entry(majority, universal);
return create(ranges, entry, Builder::new);
}
public static DurableBefore merge(DurableBefore a, DurableBefore b)
{
return ReducingIntervalMap.merge(a, b, DurableBefore.Entry::max, Builder::new);
}
public Durability min(TxnId txnId, Unseekables<?> unseekables)
{
return notDurableIfNull(foldl(unseekables, Entry::mergeMin, null, txnId, test -> test == NotDurable));
}
public Durability max(TxnId txnId, Unseekables<?> unseekables)
{
return notDurableIfNull(foldl(unseekables, Entry::mergeMax, null, txnId, test -> test == UniversalOrInvalidated));
}
public Durability get(TxnId txnId, RoutingKey participant)
{
DurableBefore.Entry entry = get(participant);
return entry == null ? NotDurable : entry.get(txnId);
}
public boolean isUniversal(TxnId txnId, RoutingKey participant)
{
return get(txnId, participant) == UniversalOrInvalidated;
}
public boolean isSomeShardDurable(TxnId txnId, Participants<?> participants, Durability durability)
{
return max(txnId, participants).compareTo(durability) >= 0;
}
public Durability min(TxnId txnId)
{
if (min.universalBefore.compareTo(txnId) > 0)
return UniversalOrInvalidated;
if (min.majorityBefore.compareTo(txnId) > 0)
return MajorityOrInvalidated;
return NotDurable;
}
private static Durability notDurableIfNull(Durability status)
{
return status == null ? NotDurable : status;
}
static class Builder extends AbstractBoundariesBuilder<RoutingKey, Entry, DurableBefore>
{
protected Builder(boolean inclusiveEnds, int capacity)
{
super(inclusiveEnds, capacity);
}
@Override
protected DurableBefore buildInternal()
{
return new DurableBefore(inclusiveEnds, starts.toArray(new RoutingKey[0]), values.toArray(new Entry[0]));
}
}
}