blob: ab2a6cadc942592cbf60d468c0c01e36ad2c9cf6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.messages;
import java.util.function.BiFunction;
import accord.local.SafeCommandStore;
import accord.utils.MapReduceConsume;
import accord.primitives.*;
import accord.utils.Invariants;
import accord.api.RoutingKey;
import accord.local.Node;
import accord.local.Node.Id;
import accord.local.PreLoadContext;
import accord.primitives.Ranges;
import accord.primitives.PartialRoute;
import accord.primitives.Route;
import accord.primitives.FullRoute;
import accord.topology.Topologies;
import accord.topology.Topology;
import static java.lang.Long.min;
public abstract class TxnRequest<R> implements Request, PreLoadContext, MapReduceConsume<SafeCommandStore, R>
{
public static abstract class WithUnsynced<R> extends TxnRequest<R>
{
public final long minUnsyncedEpoch; // TODO (low priority, clarity): can this just always be TxnId.epoch?
public final boolean doNotComputeProgressKey;
public WithUnsynced(Id to, Topologies topologies, TxnId txnId, FullRoute<?> route)
{
this(to, topologies, txnId, route, latestRelevantEpochIndex(to, topologies, route));
}
private WithUnsynced(Id to, Topologies topologies, TxnId txnId, FullRoute<?> route, int startIndex)
{
super(to, topologies, route, txnId, startIndex);
this.minUnsyncedEpoch = topologies.oldestEpoch();
this.doNotComputeProgressKey = doNotComputeProgressKey(topologies, startIndex, txnId, waitForEpoch());
Ranges ranges = topologies.forEpoch(txnId.epoch()).rangesForNode(to);
if (doNotComputeProgressKey)
{
Invariants.checkState(!route.intersects(ranges)); // confirm dest is not a replica on txnId.epoch
}
else if (Invariants.isParanoid())
{
long progressEpoch = Math.min(waitForEpoch(), txnId.epoch());
Ranges computesRangesOn = topologies.forEpoch(progressEpoch).rangesForNode(to);
if (computesRangesOn == null)
Invariants.checkState(!route.intersects(ranges));
else
Invariants.checkState(route.slice(computesRangesOn).equals(route.slice(ranges)));
}
}
protected WithUnsynced(TxnId txnId, PartialRoute<?> scope, long waitForEpoch, long minUnsyncedEpoch, boolean doNotComputeProgressKey)
{
super(txnId, scope, waitForEpoch);
this.minUnsyncedEpoch = minUnsyncedEpoch;
this.doNotComputeProgressKey = doNotComputeProgressKey;
}
@Override
RoutingKey progressKey(Node node)
{
if (doNotComputeProgressKey)
return null;
return super.progressKey(node);
}
}
public final TxnId txnId;
public final PartialRoute<?> scope;
public final long waitForEpoch;
protected transient RoutingKey progressKey;
protected transient Node node;
protected transient Id replyTo;
protected transient ReplyContext replyContext;
public TxnRequest(Node.Id to, Topologies topologies, Route<?> route, TxnId txnId)
{
this(to, topologies, route, txnId, latestRelevantEpochIndex(to, topologies, route));
}
public TxnRequest(Node.Id to, Topologies topologies, Route<?> route, TxnId txnId, int startIndex)
{
this(txnId, computeScope(to, topologies, route, startIndex), computeWaitForEpoch(to, topologies, startIndex));
}
public TxnRequest(TxnId txnId, PartialRoute<?> scope, long waitForEpoch)
{
Invariants.checkState(!scope.isEmpty());
this.txnId = txnId;
this.scope = scope;
this.waitForEpoch = waitForEpoch;
}
/**
* The portion of the complete Route that this TxnRequest applies to. Should represent the complete
* range owned by the target node for the involved epochs.
*/
public PartialRoute<?> scope()
{
return scope;
}
/**
* The minimum epoch the recipient needs to know in order to process the request. This is computed by the sender
* to permit a recipient to process a request before knowing of a topology change if the sender determines it is
* safe to do so.
*/
@Override
public long waitForEpoch()
{
return waitForEpoch;
}
@Override
public void process(Node on, Id replyTo, ReplyContext replyContext)
{
this.node = on;
this.replyTo = replyTo;
this.replyContext = replyContext;
this.progressKey = progressKey(node); // TODO (low priority, clarity): not every class that extends TxnRequest needs this set
process();
}
RoutingKey progressKey(Node node)
{
// if waitForEpoch < txnId.epoch, then this replica's ownership is unchanged
long progressEpoch = min(waitForEpoch(), txnId.epoch());
return node.trySelectProgressKey(progressEpoch, scope, scope.homeKey());
}
protected abstract void process();
// finds the first topology index that intersects with the node
protected static int latestRelevantEpochIndex(Node.Id node, Topologies topologies, Routables<?, ?> route)
{
Ranges latest = topologies.current().rangesForNode(node);
if (route.intersects(latest))
return 0;
int i = 0;
int mi = topologies.size();
// find first non-null for node
while (latest.isEmpty())
{
if (++i == mi)
return mi;
latest = topologies.get(i).rangesForNode(node);
}
if (route.intersects(latest))
return i;
// find first non-empty intersection for node
while (++i < mi)
{
Ranges next = topologies.get(i).rangesForNode(node);
if (!next.equals(latest))
{
if (route.intersects(next))
return i;
latest = next;
}
}
return mi;
}
/**
* Compute the minimum epoch the recipient must know in order to safely process the request.
*
* For now use a simple heuristic of whether the node's ownership ranges have changed,
* on the assumption that this might also mean some local shard rearrangement
* (ignoring the case where the latest epochs do not intersect the keys at all)
*/
public static long computeWaitForEpoch(Node.Id node, Topologies topologies, Unseekables<?, ?> scope)
{
return computeWaitForEpoch(node, topologies, latestRelevantEpochIndex(node, topologies, scope));
}
public static long computeWaitForEpoch(Node.Id node, Topologies topologies, int startIndex)
{
int i = Math.max(1, startIndex);
int mi = topologies.size();
if (i == mi)
return topologies.oldestEpoch();
Ranges latest = topologies.get(i - 1).rangesForNode(node);
while (i < mi)
{
Topology topology = topologies.get(i);
Ranges ranges = topology.rangesForNode(node);
if (!ranges.equals(latest))
break;
++i;
}
return topologies.get(i - 1).epoch();
}
public static PartialRoute<?> computeScope(Node.Id node, Topologies topologies, FullRoute<?> fullRoute)
{
return computeScope(node, topologies, fullRoute, latestRelevantEpochIndex(node, topologies, fullRoute));
}
public static PartialRoute<?> computeScope(Node.Id node, Topologies topologies, Route<?> route, int startIndex)
{
return computeScope(node, topologies, route, startIndex, Route::slice, PartialRoute::union);
}
// TODO (low priority, clarity): move to Topologies
public static <I, O> O computeScope(Node.Id node, Topologies topologies, I keys, int startIndex, BiFunction<I, Ranges, O> slice, BiFunction<O, O, O> merge)
{
Ranges last = null;
O scope = null;
for (int i = startIndex, mi = topologies.size() ; i < mi ; ++i)
{
Topology topology = topologies.get(i);
Ranges ranges = topology.rangesForNode(node);
if (ranges != last && !ranges.equals(last))
{
O add = slice.apply(keys, ranges);
scope = scope == null ? add : merge.apply(scope, add);
}
last = ranges;
}
if (scope == null)
throw new IllegalArgumentException("No intersection");
return scope;
}
private static boolean doNotComputeProgressKey(Topologies topologies, int startIndex, TxnId txnId, long waitForEpoch)
{
// to understand this calculation we must bear in mind the following:
// - startIndex is the "latest relevant" which means we skip over recent epochs where we are not owners at all,
// i.e. if this node does not participate in the most recent epoch, startIndex > 0
// - waitForEpoch gives us the most recent epoch with differing ownership information, starting from startIndex
// So, we can have some surprising situations arise where a *prior* owner must be contacted for its vote,
// and does not need to wait for the latest ring information because from the point of view of its contribution
// the stale ring information is sufficient, however we do not want it to compute a progress key with this stale
// ring information and mistakenly believe that it is a home shard for the transaction, as it will not receive
// updates for the transaction going forward.
// So in these cases we send a special flag indicating that the progress key should not be computed
// (as it might be done so with stale ring information)
// TODO (low priority, clarity): this would be better defined as "hasProgressKey"
return waitForEpoch < txnId.epoch() && startIndex > 0
&& topologies.get(startIndex).epoch() < txnId.epoch();
}
}