blob: 7092dd31915b160f7177bfb84aeeb2daa7512783 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.burn;
import accord.api.TestableConfigurationService;
import accord.local.AgentExecutor;
import accord.local.Node;
import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.topology.Topology;
import accord.utils.MessageTask;
import org.agrona.collections.Long2ObjectHashMap;
import java.util.*;
public class TopologyUpdates
{
private final Long2ObjectHashMap<Map<Node.Id, Ranges>> pendingTopologies = new Long2ObjectHashMap<>();
private final AgentExecutor executor;
public TopologyUpdates(AgentExecutor executor)
{
this.executor = executor;
}
public synchronized MessageTask notify(Node originator, Topology prev, Topology update)
{
Set<Node.Id> nodes = new TreeSet<>(prev.nodes());
nodes.addAll(update.nodes());
Map<Node.Id, Ranges> nodeToNewRanges = new HashMap<>();
for (Node.Id node : nodes)
{
Ranges newRanges = update.rangesForNode(node).difference(prev.rangesForNode(node));
nodeToNewRanges.put(node, newRanges);
}
pendingTopologies.put(update.epoch(), nodeToNewRanges);
return MessageTask.begin(originator, nodes, executor, "TopologyNotify:" + update.epoch(), (node, from, onDone) -> {
long nodeEpoch = node.topology().epoch();
if (nodeEpoch + 1 < update.epoch())
onDone.accept(false);
((TestableConfigurationService) node.configService()).reportTopology(update);
onDone.accept(true);
});
}
public synchronized void syncComplete(Node originator, Collection<Node.Id> cluster, long epoch)
{
Map<Node.Id, Ranges> pending = pendingTopologies.get(epoch);
if (pending == null || null == pending.remove(originator.id()))
throw new AssertionError();
if (pending.isEmpty())
pendingTopologies.remove(epoch);
MessageTask.begin(originator, cluster, executor, "SyncComplete:" + epoch, (node, from, onDone) -> {
node.onRemoteSyncComplete(originator.id(), epoch);
onDone.accept(true);
});
}
public boolean isPending(Range range, Node.Id id)
{
return pendingTopologies.entrySet().stream().anyMatch(e -> {
Ranges ranges = e.getValue().get(id);
return ranges != null && ranges.intersects(range);
});
}
public int pendingTopologies()
{
return pendingTopologies.size();
}
}