blob: e4db9991b100c763030e7280448cf361927cbd19 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package accord.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.coordinate.CoordinateGloballyDurable;
import accord.coordinate.CoordinateShardDurable;
import accord.coordinate.CoordinateSyncPoint;
import accord.local.Node;
import accord.local.ShardDistributor;
import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.Routable.Domain;
import accord.primitives.SyncPoint;
import accord.primitives.TxnId;
import accord.topology.Shard;
import accord.topology.Topology;
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncResult;
import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
* Helper methods and classes to invoke coordination to propagate information about durability.
* Both CoordinateShardDurable and CoordinateGloballyDurable use wall time to loosely coordinate between nodes
* so that they take non-overlapping (hopefully) turns doing the coordination.
* Both of these have a concept of rounds where rounds have a known duration in wall time, and the current round is known
* based on time since the epoch, and the point in time where a node should do something in a given round is known based
* on its index in the sorted list of nodes ids in the current epoch.
* Coordinate globally durable is simpler because they just need to take turns so nodes just calculate when it is their
* turn and invoke CoordinateGloballyDurable.
* CoordinateShardDurable needs nodes to not overlap on the ranges they operate on or the exlusive sync points overlap
* with each other and block progress. A target duration to process the entire ring is set, and then each node in the
* current round has a time in the round that it should start processing, and the time it starts and the subranges it is
* responsible for rotates backwards every round so that a down node doesn't prevent a subrange from being processed.
* The work for CoordinateShardDurable is further subdivided where each subrange a node operates on is divided a fixed
* number of times and then processed one at a time with a fixed wait between them.
* // TODO review will the scheduler shut down on its own or do the individual tasks need to be canceled manually?
* // TODO (expected): cap number of coordinations we can have in flight at once
* Didn't go with recurring because it doesn't play well with async execution of these tasks
public class CoordinateDurabilityScheduling
private static final Logger logger = LoggerFactory.getLogger(CoordinateDurabilityScheduling.class);
private final Node node;
* In each round at each node wait this amount of time between initiating new CoordinateShardDurable
private int frequencyMicros = Ints.saturatedCast(TimeUnit.MILLISECONDS.toMicros(500L));
* In each round at each node wait this amount of time between allocating a CoordinateShardDurable txnId
* and coordinating the shard durability
private int txnIdLagMicros = Ints.saturatedCast(TimeUnit.SECONDS.toMicros(1L));
* In each round at each node wait this amount of time between allocating a CoordinateShardDurable txnId
* and coordinating the shard durability
private int durabilityLagMicros = Ints.saturatedCast(TimeUnit.MILLISECONDS.toMicros(500L));
* Target for how often the entire ring should be processed in microseconds. Every node will start at an offset in the current round that is based
* on this value / by (total # replicas * its index in the current round). The current round is determined by dividing time since the epoch by this
* duration.
private int shardCycleTimeMicros = Ints.saturatedCast(TimeUnit.SECONDS.toMicros(30));
* Every node will independently attempt to invoke CoordinateGloballyDurable
* with a target gap between invocations of globalCycleTimeMicros
* This is done by nodes taking turns for each scheduled attempt that is due by calculating what # attempt is
* next for the current node ordinal in the cluster and time since the unix epoch and attempting to invoke then. If all goes
* well they end up doing it periodically in a timely fashion with the target gap achieved.
* TODO (desired): run this more often, but for less than the whole cluster, patterned in such a way as to ensure rapid cross-pollination
private long globalCycleTimeMicros = TimeUnit.SECONDS.toMicros(30);
private Topology currentGlobalTopology;
private final Map<Shard, Integer> localIndexes = new HashMap<>();
private int globalIndex;
private long nextGlobalSyncTimeMicros;
private long prevShardSyncTimeMicros;
volatile boolean stop;
public CoordinateDurabilityScheduling(Node node)
this.node = node;
public void setFrequency(int frequency, TimeUnit units)
this.frequencyMicros = Ints.saturatedCast(units.toMicros(frequency));
public void setTxnIdLag(int txnIdLag, TimeUnit units)
this.txnIdLagMicros = Ints.saturatedCast(units.toMicros(txnIdLag));
public void setDurabilityLag(int durabilityLag, TimeUnit units)
this.durabilityLagMicros = Ints.saturatedCast(units.toMicros(durabilityLag));
public void setShardCycleTime(int shardCycleTime, TimeUnit units)
this.shardCycleTimeMicros = Ints.saturatedCast(units.toMicros(shardCycleTime));
public void setGlobalCycleTime(long globalCycleTime, TimeUnit units)
this.globalCycleTimeMicros = units.toMicros(globalCycleTime);
* Schedule regular invocations of CoordinateShardDurable and CoordinateGloballyDurable
public void start()
Invariants.checkState(!stop); // cannot currently restart safely
long nowMicros = node.unix(MICROSECONDS);
prevShardSyncTimeMicros = nowMicros;
public void stop()
stop = true;
private void requeue()
if (!stop)
node.scheduler().once(this::run, frequencyMicros, MICROSECONDS);
* Schedule the first CoordinateShardDurable execution for the current round. Sub-steps will be scheduled after
* each sub-step completes, and once all are completed scheduleCoordinateShardDurable is called again.
private void run()
if (stop)
if (currentGlobalTopology.size() == 0)
long nowMicros = node.unix(MICROSECONDS);
if (nextGlobalSyncTimeMicros <= nowMicros)
List<Ranges> coordinate = rangesToShardSync(nowMicros);
prevShardSyncTimeMicros = nowMicros;
if (coordinate.isEmpty())
logger.trace("Nothing pending in schedule for time slot at {}", nowMicros);
logger.trace("Scheduling CoordinateShardDurable for {} at {}", coordinate, nowMicros);
for (Ranges ranges : coordinate)
* The first step for coordinating shard durable is to run an exclusive sync point
* the result of which can then be used to run
private void startShardSync(Ranges ranges)
TxnId at = node.nextTxnId(ExclusiveSyncPoint, Domain.Range);
node.scheduler().once(() -> node.withEpoch(at.epoch(), () -> {
CoordinateSyncPoint.exclusive(node, at, ranges)
.addCallback((success, fail) -> {
if (fail != null) logger.trace("Exception coordinating exclusive sync point for local shard durability of {}", ranges, fail);
else coordinateShardDurableAfterExclusiveSyncPoint(node, success);
}), txnIdLagMicros, MICROSECONDS);
private void coordinateShardDurableAfterExclusiveSyncPoint(Node node, SyncPoint exclusiveSyncPoint)
node.scheduler().once(() -> {
node.commandStores().any().execute(() -> {
CoordinateShardDurable.coordinate(node, exclusiveSyncPoint)
.addCallback((success, fail) -> {
if (fail != null)
logger.trace("Exception coordinating local shard durability, will retry immediately", fail);
coordinateShardDurableAfterExclusiveSyncPoint(node, exclusiveSyncPoint);
}, durabilityLagMicros, MICROSECONDS);
private void startGlobalSync()
long epoch = node.epoch();
AsyncChain<AsyncResult<Void>> resultChain = node.withEpoch(epoch, () -> node.commandStores().any().submit(() -> CoordinateGloballyDurable.coordinate(node, epoch)));
resultChain.begin((success, fail) -> {
if (fail != null) logger.trace("Exception initiating coordination of global durability", fail);
else logger.trace("Successful coordination of global durability");
catch (Exception e)
logger.error("Exception invoking withEpoch to start coordination for global durability", e);
private List<Ranges> rangesToShardSync(long nowMicros)
ShardDistributor distributor = node.commandStores().shardDistributor();
List<Ranges> result = new ArrayList<>();
for (Map.Entry<Shard, Integer> e : localIndexes.entrySet())
Shard shard = e.getKey();
int index = e.getValue();
long microsOffset = (index * shardCycleTimeMicros) / shard.rf();
int shardCycleTimeMicros = Math.max(this.shardCycleTimeMicros, Ints.saturatedCast(shard.rf() * 3L * frequencyMicros));
long prevSyncTimeMicros = Math.max(prevShardSyncTimeMicros, nowMicros - ((shardCycleTimeMicros / shard.rf()) / 2L));
int from = (int) ((prevSyncTimeMicros + microsOffset) % shardCycleTimeMicros);
int to = (int) ((nowMicros + microsOffset) % shardCycleTimeMicros);
List<Range> ranges = new ArrayList<>();
if (from > to)
Range range = distributor.splitRange(shard.range, to, shardCycleTimeMicros, shardCycleTimeMicros);
if (range != null)
to = from;
from = 0;
Range range = distributor.splitRange(shard.range, from, to, shardCycleTimeMicros);
if (range != null)
if (!ranges.isEmpty())
result.add(Ranges.of(ranges.toArray(new Range[0])));
return result;
private void updateTopology()
Topology latestGlobal = node.topology().current();
if (latestGlobal == currentGlobalTopology)
Topology latestLocal = latestGlobal.forNode(;
if (latestLocal.size() == 0)
currentGlobalTopology = latestGlobal;
List<Node.Id> ids = new ArrayList<>(latestGlobal.nodes());
globalIndex = ids.indexOf(;
for (Shard shard : latestLocal.shards())
localIndexes.put(shard, shard.sortedNodes.indexOf(;
* Based on the current unix time (simulated or otherwise) calculate the wait time in microseconds until the next turn of this
* node for some activity with a target gap between nodes doing the activity.
* This is done by taking the index of the node in the current topology and the total number of nodes
* and then using the target gap between invocations to calculate a "round" duration and point of time each node
* should have its turn in each round based on its index and calculating the time to the next turn for that node will occur.
* It's assumed it is fine if nodes overlap or reorder or skip for whatever activity we are picking turns for as long as it is approximately
* the right pacing.
private void setNextGlobalSyncTime(long nowMicros)
if (currentGlobalTopology == null)
nextGlobalSyncTimeMicros = nowMicros;
// How long it takes for all nodes to go once
long totalRoundDuration = currentGlobalTopology.nodes().size() * globalCycleTimeMicros;
long startOfCurrentRound = (nowMicros / totalRoundDuration) * totalRoundDuration;
// In a given round at what time in the round should this node take its turn
long ourOffsetInRound = globalIndex * globalCycleTimeMicros;
long targetTimeInCurrentRound = startOfCurrentRound + ourOffsetInRound;
long targetTime = targetTimeInCurrentRound;
// If our time to run in the current round already passed then schedule it in the next round
if (targetTimeInCurrentRound < nowMicros)
targetTime += totalRoundDuration;
nextGlobalSyncTimeMicros = targetTime - nowMicros;