| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package accord.impl; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import accord.api.Data; |
| import accord.api.DataStore; |
| import accord.coordinate.CoordinateSyncPoint; |
| import accord.coordinate.FetchCoordinator; |
| import accord.local.CommandStore; |
| import accord.local.Node; |
| import accord.messages.Callback; |
| import accord.messages.MessageType; |
| import accord.messages.ReadData.CommitOrReadNack; |
| import accord.messages.ReadData.ReadOk; |
| import accord.messages.ReadData.ReadReply; |
| import accord.messages.WaitUntilAppliedAndReadData; |
| import accord.primitives.PartialDeps; |
| import accord.primitives.PartialTxn; |
| import accord.primitives.Ranges; |
| import accord.primitives.SyncPoint; |
| import accord.primitives.Timestamp; |
| import accord.primitives.TxnId; |
| import accord.utils.Invariants; |
| import accord.utils.async.AsyncChains; |
| import accord.utils.async.AsyncResult; |
| import accord.utils.async.AsyncResults; |
| import javax.annotation.Nullable; |
| |
| import static accord.messages.ReadData.CommitOrReadNack.Insufficient; |
| import static accord.primitives.Routables.Slice.Minimal; |
| |
| public abstract class AbstractFetchCoordinator extends FetchCoordinator |
| { |
| private static final Logger logger = LoggerFactory.getLogger(AbstractFetchCoordinator.class); |
| |
| static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult |
| { |
| final AbstractFetchCoordinator coordinator; |
| |
| FetchResult(AbstractFetchCoordinator coordinator) |
| { |
| this.coordinator = coordinator; |
| } |
| |
| @Override |
| public void abort(Ranges abort) |
| { |
| coordinator.abort(abort); |
| } |
| } |
| |
| static class Key |
| { |
| final Node.Id id; |
| final Ranges ranges; |
| |
| Key(Node.Id id, Ranges ranges) |
| { |
| this.id = id; |
| this.ranges = ranges; |
| } |
| |
| @Override |
| public int hashCode() |
| { |
| return (31 + id.hashCode()) * 31 + ranges.hashCode(); |
| } |
| |
| @Override |
| public boolean equals(Object obj) |
| { |
| if (this == obj) return true; |
| if (!(obj instanceof Key)) return false; |
| Key that = (Key) obj; |
| return id.equals(that.id) && ranges.equals(that.ranges); |
| } |
| } |
| |
| final DataStore.FetchRanges fetchRanges; |
| final CommandStore commandStore; |
| final Map<Key, DataStore.StartingRangeFetch> inflight = new HashMap<>(); |
| final FetchResult result = new FetchResult(this); |
| final List<AsyncResult<Void>> persisting = new ArrayList<>(); |
| |
| protected AbstractFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore) |
| { |
| super(node, ranges, syncPoint, fetchRanges); |
| this.fetchRanges = fetchRanges; |
| this.commandStore = commandStore; |
| } |
| |
| public CommandStore commandStore() |
| { |
| return commandStore; |
| } |
| |
| protected abstract PartialTxn rangeReadTxn(Ranges ranges); |
| |
| protected abstract void onReadOk(Node.Id from, CommandStore commandStore, Data data, Ranges ranges); |
| |
| protected FetchRequest newFetchRequest(long sourceEpoch, TxnId syncId, Ranges ranges, PartialDeps partialDeps, PartialTxn partialTxn) |
| { |
| return new FetchRequest(sourceEpoch, syncId, ranges, partialDeps, rangeReadTxn(ranges)); |
| } |
| |
| @Override |
| public void contact(Node.Id to, Ranges ranges) |
| { |
| Key key = new Key(to, ranges); |
| inflight.put(key, starting(to, ranges)); |
| Ranges ownedRanges = ownedRangesForNode(to); |
| Invariants.checkArgument(ownedRanges.containsAll(ranges), "Got a reply from %s for ranges %s, but owned ranges %s does not contain all the ranges", to, ranges, ownedRanges); |
| PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges); |
| node.send(to, newFetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadReply>() |
| { |
| @Override |
| public void onSuccess(Node.Id from, ReadReply reply) |
| { |
| if (!reply.isOk()) |
| { |
| if (reply == Insufficient) |
| { |
| CoordinateSyncPoint.sendApply(node, from, syncPoint); |
| } |
| else |
| { |
| fail(to, new RuntimeException(reply.toString())); |
| inflight.remove(key).cancel(); |
| switch ((CommitOrReadNack) reply) |
| { |
| default: throw new AssertionError("Unhandled enum: " + reply); |
| case Invalid: |
| case Redundant: |
| case Rejected: |
| // TODO (expected): stop fetch sync points from garbage collecting too quickly |
| throw new AssertionError(String.format("Unexpected reply: %s", reply)); |
| } |
| } |
| return; |
| } |
| |
| FetchResponse ok = (FetchResponse) reply; |
| Ranges received; |
| if (ok.unavailable != null) |
| { |
| unavailable(to, ok.unavailable); |
| if (ok.data == null) |
| { |
| inflight.remove(key).cancel(); |
| return; |
| } |
| received = ranges.subtract(ok.unavailable); |
| } |
| else |
| { |
| received = ranges; |
| } |
| |
| // TODO (now): make sure it works if invoked in either order |
| inflight.remove(key).started(ok.maxApplied); |
| onReadOk(to, commandStore, ok.data, received); |
| // received must be invoked after submitting the persistence future, as it triggers onDone |
| // which creates a ReducingFuture over {@code persisting} |
| } |
| |
| @Override |
| public void onFailure(Node.Id from, Throwable failure) |
| { |
| inflight.remove(key).cancel(); |
| fail(from, failure); |
| } |
| |
| @Override |
| public void onCallbackFailure(Node.Id from, Throwable failure) |
| { |
| // TODO (soon) |
| logger.error("Fetch coordination failure from " + from, failure); |
| } |
| }); |
| } |
| |
| public FetchResult result() |
| { |
| return result; |
| } |
| |
| @Override |
| protected void onDone(Ranges success, Throwable failure) |
| { |
| if (failure != null || success.isEmpty()) result.setFailure(failure); |
| else if (persisting.isEmpty()) result.setSuccess(Ranges.EMPTY); |
| else AsyncChains.reduce(persisting, (a, b) -> null) |
| .begin((s, f) -> { |
| if (f == null) result.setSuccess(ranges); |
| else result.setFailure(f); |
| }); |
| } |
| |
| @Override |
| public void start() |
| { |
| super.start(); |
| } |
| |
| void abort(Ranges abort) |
| { |
| // TODO (expected): implement abort |
| } |
| |
| public static class FetchRequest extends WaitUntilAppliedAndReadData |
| { |
| public final PartialDeps partialDeps; |
| |
| public FetchRequest(long sourceEpoch, TxnId syncId, Ranges ranges, PartialDeps partialDeps, PartialTxn partialTxn) |
| { |
| super(syncId, ranges, sourceEpoch, partialTxn); |
| this.partialDeps = partialDeps; |
| } |
| |
| @Override |
| protected void readComplete(CommandStore commandStore, Data result, Ranges unavailable) |
| { |
| Ranges reportUnavailable = unavailable.slice((Ranges)this.readScope, Minimal); |
| super.readComplete(commandStore, result, reportUnavailable); |
| } |
| |
| @Override |
| protected void onAllSuccess(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Throwable fail) |
| { |
| // TODO (review): If the fetch response actually does some streaming, but we send back the error |
| // it is a lot of work and data that might move and be unaccounted for at the coordinator |
| node.reply(replyTo, replyContext, fail == null ? new FetchResponse(unavailable, data, maxApplied()) : null, fail); |
| } |
| |
| protected Timestamp maxApplied() |
| { |
| return null; |
| } |
| |
| @Override |
| public MessageType type() |
| { |
| return MessageType.FETCH_DATA_REQ; |
| } |
| } |
| |
| public static class FetchResponse extends ReadOk |
| { |
| public final @Nullable Timestamp maxApplied; |
| public FetchResponse(@Nullable Ranges unavailable, @Nullable Data data, @Nullable Timestamp maxApplied) |
| { |
| super(unavailable, data); |
| this.maxApplied = maxApplied; |
| } |
| |
| @Override |
| public MessageType type() |
| { |
| return MessageType.FETCH_DATA_RSP; |
| } |
| } |
| } |