| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package accord.burn; |
| |
| import accord.api.TestableConfigurationService; |
| import accord.impl.InMemoryCommandStore; |
| import accord.coordinate.FetchData; |
| import accord.impl.InMemoryCommandStores; |
| import accord.local.Command; |
| import accord.local.Commands; |
| import accord.local.Node; |
| import accord.local.Status; |
| import accord.messages.CheckStatus.CheckStatusOk; |
| import accord.primitives.*; |
| import accord.topology.Shard; |
| import accord.topology.Topology; |
| import accord.utils.MapReduce; |
| import accord.utils.MessageTask; |
| import accord.utils.Invariants; |
| import accord.utils.async.AsyncChain; |
| import accord.utils.async.AsyncChains; |
| import accord.utils.async.AsyncResult; |
| import accord.utils.async.AsyncResults; |
| import com.google.common.collect.Sets; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.*; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.function.BiConsumer; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import java.util.stream.Stream; |
| |
| import static accord.coordinate.Invalidate.invalidate; |
| import static accord.local.PreLoadContext.contextFor; |
| import static accord.local.Status.*; |
| import static accord.local.Status.Known.*; |
| import static accord.utils.async.AsyncChains.getUninterruptibly; |
| |
| public class TopologyUpdates |
| { |
| private static final Logger logger = LoggerFactory.getLogger(TopologyUpdates.class); |
| |
| private static class CommandSync |
| { |
| final TxnId txnId; |
| final Status status; |
| final Route<?> route; |
| final Timestamp executeAt; |
| final long fromEpoch; |
| final long toEpoch; |
| |
| public CommandSync(TxnId txnId, CheckStatusOk status, long srcEpoch, long trgEpoch) |
| { |
| Invariants.checkArgument(status.saveStatus.hasBeen(Status.PreAccepted)); |
| Invariants.checkState(status.route != null && !status.route.isEmpty()); |
| this.txnId = txnId; |
| this.status = status.saveStatus.status; |
| this.route = status.route; |
| this.executeAt = status.executeAt; |
| this.fromEpoch = srcEpoch; |
| this.toEpoch = trgEpoch; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "CommandSync{" + "txnId:" + txnId + ", fromEpoch:" + fromEpoch + ", toEpoch:" + toEpoch + '}'; |
| } |
| |
| public void process(Node node, Consumer<Boolean> onDone) |
| { |
| if (!node.topology().hasEpoch(toEpoch)) |
| { |
| node.configService().fetchTopologyForEpoch(toEpoch); |
| node.topology().awaitEpoch(toEpoch).addCallback(() -> process(node, onDone)); |
| return; |
| } |
| |
| AsyncChain<Status> statusChain = node.commandStores().mapReduce(contextFor(txnId), route, toEpoch, toEpoch, |
| MapReduce.of(safeStore -> safeStore.command(txnId).current().status(), |
| (a, b) -> a.compareTo(b) <= 0 ? a : b)); |
| AsyncResult<Object> sync = statusChain.map(minStatus -> { |
| if (minStatus == null || minStatus.phase.compareTo(status.phase) >= 0) |
| { |
| // TODO (low priority): minStatus == null means we're sending redundant messages |
| onDone.accept(true); |
| return null; |
| } |
| |
| BiConsumer<Known, Throwable> callback = (outcome, fail) -> { |
| if (fail != null) |
| process(node, onDone); |
| else if (outcome == Nothing) |
| invalidate(node, txnId, route.with(route.homeKey()), (i1, i2) -> process(node, onDone)); |
| else |
| onDone.accept(true); |
| }; |
| switch (status) |
| { |
| case NotWitnessed: |
| onDone.accept(true); |
| break; |
| case PreAccepted: |
| case Accepted: |
| case AcceptedInvalidate: |
| FetchData.fetch(DefinitionOnly, node, txnId, route, toEpoch, callback); |
| break; |
| case Committed: |
| case ReadyToExecute: |
| FetchData.fetch(Committed.minKnown, node, txnId, route, toEpoch, callback); |
| break; |
| case PreApplied: |
| case Applied: |
| node.withEpoch(Math.max(executeAt.epoch(), toEpoch), () -> { |
| FetchData.fetch(PreApplied.minKnown, node, txnId, route, executeAt, toEpoch, callback); |
| }); |
| break; |
| case Invalidated: |
| AsyncChain<Void> invalidate = node.forEachLocal(contextFor(txnId), route, txnId.epoch(), toEpoch, safeStore -> { |
| Commands.commitInvalidate(safeStore, txnId); |
| }); |
| |
| dieExceptionally(invalidate.addCallback(((unused, failure) -> onDone.accept(failure == null))).beginAsResult()); |
| } |
| return null; |
| }).beginAsResult(); |
| dieExceptionally(sync); |
| } |
| } |
| |
| private final Set<Long> pendingTopologies = Sets.newConcurrentHashSet(); |
| |
| public static <T> BiConsumer<T, Throwable> dieOnException() |
| { |
| return (result, throwable) -> { |
| if (throwable != null) |
| { |
| logger.error("", throwable); |
| System.exit(1); |
| } |
| }; |
| } |
| |
| public static <T> AsyncResult<T> dieExceptionally(AsyncResult<T> stage) |
| { |
| stage.addCallback(dieOnException()); |
| return stage; |
| } |
| |
| public MessageTask notify(Node originator, Collection<Node.Id> cluster, Topology update) |
| { |
| pendingTopologies.add(update.epoch()); |
| return MessageTask.begin(originator, cluster, "TopologyNotify:" + update.epoch(), (node, from, onDone) -> { |
| long nodeEpoch = node.topology().epoch(); |
| if (nodeEpoch + 1 < update.epoch()) |
| onDone.accept(false); |
| ((TestableConfigurationService) node.configService()).reportTopology(update); |
| onDone.accept(true); |
| }); |
| } |
| |
| private static Collection<Node.Id> allNodesFor(Txn txn, Topology... topologies) |
| { |
| Set<Node.Id> result = new HashSet<>(); |
| for (Topology topology : topologies) |
| result.addAll(topology.forSelection(txn.keys().toUnseekables()).nodes()); |
| return result; |
| } |
| |
| private static AsyncChain<Stream<MessageTask>> syncEpochCommands(Node node, long srcEpoch, Ranges ranges, Function<CommandSync, Collection<Node.Id>> recipients, long trgEpoch, boolean committedOnly) |
| { |
| Map<TxnId, CheckStatusOk> syncMessages = new ConcurrentHashMap<>(); |
| Consumer<Command> commandConsumer = command -> syncMessages.merge(command.txnId(), new CheckStatusOk(node, command), CheckStatusOk::merge); |
| AsyncChain<Void> start; |
| if (committedOnly) |
| start = node.commandStores().forEach(commandStore -> InMemoryCommandStore.inMemory(commandStore).forCommittedInEpoch(ranges, srcEpoch, commandConsumer)); |
| else |
| start = node.commandStores().forEach(commandStore -> InMemoryCommandStore.inMemory(commandStore).forEpochCommands(ranges, srcEpoch, commandConsumer)); |
| |
| return start.map(ignore -> syncMessages.entrySet().stream().map(e -> { |
| CommandSync sync = new CommandSync(e.getKey(), e.getValue(), srcEpoch, trgEpoch); |
| return MessageTask.of(node, recipients.apply(sync), sync.toString(), sync::process); |
| })); |
| } |
| |
| private static final boolean PREACCEPTED = false; |
| private static final boolean COMMITTED_ONLY = true; |
| |
| /** |
| * Syncs all replicated commands. Overkill, but useful for confirming issues in optimizedSync |
| */ |
| private static AsyncChain<Stream<MessageTask>> thoroughSync(Node node, long syncEpoch) |
| { |
| Topology syncTopology = node.configService().getTopologyForEpoch(syncEpoch); |
| Topology localTopology = syncTopology.forNode(node.id()); |
| Function<CommandSync, Collection<Node.Id>> allNodes = cmd -> node.topology().withUnsyncedEpochs(cmd.route, syncEpoch + 1).nodes(); |
| |
| Ranges ranges = localTopology.ranges(); |
| List<AsyncChain<Stream<MessageTask>>> work = new ArrayList<>(); |
| for (long epoch=1; epoch<=syncEpoch; epoch++) |
| work.add(syncEpochCommands(node, epoch, ranges, allNodes, syncEpoch, COMMITTED_ONLY)); |
| return AsyncChains.reduce(work, Stream.empty(), Stream::concat); |
| } |
| |
| /** |
| * Syncs all newly replicated commands when nodes are gaining ranges and the current epoch |
| */ |
| private static AsyncChain<Stream<MessageTask>> optimizedSync(Node node, long srcEpoch) |
| { |
| long trgEpoch = srcEpoch + 1; |
| Topology syncTopology = node.configService().getTopologyForEpoch(srcEpoch); |
| Topology localTopology = syncTopology.forNode(node.id()); |
| Topology nextTopology = node.configService().getTopologyForEpoch(trgEpoch); |
| Function<CommandSync, Collection<Node.Id>> allNodes = cmd -> node.topology().preciseEpochs(cmd.route, trgEpoch, trgEpoch).nodes(); |
| |
| // backfill new replicas with operations from prior epochs |
| List<AsyncChain<Stream<MessageTask>>> work = new ArrayList<>(localTopology.shards().size()); |
| for (Shard syncShard : localTopology.shards()) |
| { |
| for (Shard nextShard : nextTopology.shards()) |
| { |
| // do nothing if there's no change |
| if (syncShard.range.equals(nextShard.range) && syncShard.nodeSet.equals(nextShard.nodeSet)) |
| continue; |
| |
| Range intersection = syncShard.range.intersection(nextShard.range); |
| |
| if (intersection == null) |
| continue; |
| |
| Set<Node.Id> newNodes = Sets.difference(nextShard.nodeSet, syncShard.nodeSet); |
| |
| if (newNodes.isEmpty()) |
| continue; |
| |
| Ranges ranges = Ranges.single(intersection); |
| for (long epoch=1; epoch<srcEpoch; epoch++) |
| work.add(syncEpochCommands(node, epoch, ranges, cmd -> newNodes, trgEpoch, COMMITTED_ONLY)); |
| } |
| } |
| |
| // update all current and future replicas with the contents of the sync epoch |
| work.add(syncEpochCommands(node, srcEpoch, localTopology.ranges(), allNodes, trgEpoch, PREACCEPTED)); |
| |
| return AsyncChains.reduce(work, Stream.empty(), Stream::concat); |
| } |
| |
| private static AsyncChain<Void> sync(Node node, long syncEpoch) |
| { |
| return optimizedSync(node, syncEpoch) |
| .flatMap(messageStream -> { |
| Iterator<MessageTask> iter = messageStream.iterator(); |
| if (!iter.hasNext()) return AsyncResults.success(null); |
| |
| MessageTask first = iter.next(); |
| MessageTask last = first; |
| while (iter.hasNext()) |
| { |
| MessageTask next = iter.next(); |
| last.addCallback(next); |
| last = next; |
| } |
| |
| first.run(); |
| return dieExceptionally(last); |
| }); |
| } |
| |
| public AsyncResult<Void> syncEpoch(Node originator, long epoch, Collection<Node.Id> cluster) |
| { |
| AsyncResult<Void> result = dieExceptionally(sync(originator, epoch) |
| .flatMap(v -> MessageTask.apply(originator, cluster, "SyncComplete:" + epoch, (node, from, onDone) -> { |
| node.onEpochSyncComplete(originator.id(), epoch); |
| onDone.accept(true); |
| })).beginAsResult()); |
| result.addCallback((unused, throwable) -> pendingTopologies.remove(epoch)); |
| return result; |
| } |
| |
| public int pendingTopologies() |
| { |
| return pendingTopologies.size(); |
| } |
| } |