blob: a8b8a98e3d7d98e1600e86af26547f065f77840c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.topology;
import accord.api.TopologySorter;
import accord.local.Node;
import accord.local.Node.Id;
import accord.primitives.Ranges;
import accord.utils.IndexedConsumer;
import com.google.common.collect.Sets;
import accord.utils.Invariants;
import java.util.*;
// TODO (desired, efficiency/clarity): since Topologies are rarely needed, should optimise API for single topology case
// (e.g. at least implementing Topologies by Topology)
public interface Topologies extends TopologySorter
{
Topology current();
Topology forEpoch(long epoch);
long oldestEpoch();
default long currentEpoch()
{
return current().epoch;
}
// topologies are stored in reverse epoch order, with the highest epoch at idx 0
Topology get(int i);
int size();
int totalShards();
boolean contains(Id to);
Set<Node.Id> nodes();
Set<Node.Id> copyOfNodes();
int estimateUniqueNodes();
Ranges computeRangesForNode(Id node);
int maxShardsPerEpoch();
default void forEach(IndexedConsumer<Topology> consumer)
{
for (int i=0, mi=size(); i<mi; i++)
consumer.accept(get(i), i);
}
static boolean equals(Topologies t, Object o)
{
if (o == t)
return true;
if (!(o instanceof Topologies))
return false;
Topologies that = (Topologies) o;
if (t.size() != that.size())
return false;
for (int i=0, mi=t.size(); i<mi; i++)
{
if (!t.get(i).equals(that.get(i)))
return false;
}
return true;
}
static int hashCode(Topologies t)
{
int hashCode = 1;
for (int i=0, mi=t.size(); i<mi; i++) {
hashCode = 31 * hashCode + t.get(i).hashCode();
}
return hashCode;
}
static String toString(Topologies t)
{
StringBuilder sb = new StringBuilder("[");
for (int i=0, mi=t.size(); i<mi; i++)
{
if (i > 0)
sb.append(", ");
sb.append(t.get(i).toString());
}
sb.append("]");
return sb.toString();
}
class Single implements Topologies
{
private final TopologySorter sorter;
private final Topology topology;
public Single(TopologySorter.Supplier sorter, Topology topology)
{
this.topology = topology;
this.sorter = sorter.get(this);
}
public Single(TopologySorter sorter, Topology topology)
{
this.topology = topology;
this.sorter = sorter;
}
@Override
public Topology current()
{
return topology;
}
@Override
public Topology forEpoch(long epoch)
{
if (topology.epoch != epoch)
throw new IndexOutOfBoundsException();
return topology;
}
@Override
public long oldestEpoch()
{
return currentEpoch();
}
@Override
public Topology get(int i)
{
if (i != 0)
throw new IndexOutOfBoundsException(Integer.toString(i));
return topology;
}
@Override
public int size()
{
return 1;
}
@Override
public int totalShards()
{
return topology.size();
}
@Override
public boolean contains(Id to)
{
return topology.contains(to);
}
@Override
public Set<Node.Id> nodes()
{
return topology.nodes();
}
@Override
public Set<Id> copyOfNodes()
{
return new HashSet<>(nodes());
}
@Override
public int estimateUniqueNodes()
{
return topology.nodes().size();
}
@Override
public Ranges computeRangesForNode(Id node)
{
return topology.rangesForNode(node);
}
@Override
public int maxShardsPerEpoch()
{
return topology.size();
}
@Override
public boolean equals(Object obj)
{
return Topologies.equals(this, obj);
}
@Override
public int hashCode()
{
return Topologies.hashCode(this);
}
@Override
public String toString()
{
return Topologies.toString(this);
}
@Override
public int compare(Id node1, Id node2, ShardSelection shards)
{
return sorter.compare(node1, node2, shards);
}
}
class Multi implements Topologies
{
private final TopologySorter sorter;
private final List<Topology> topologies;
private final int maxShardsPerEpoch;
public Multi(TopologySorter.Supplier sorter, int initialCapacity)
{
this.topologies = new ArrayList<>(initialCapacity);
this.sorter = sorter.get(this);
int maxShardsPerEpoch = 0;
for (int i = 0 ; i < topologies.size() ; ++i)
maxShardsPerEpoch = Math.max(maxShardsPerEpoch, topologies.get(i).size());
this.maxShardsPerEpoch = maxShardsPerEpoch;
}
public Multi(TopologySorter.Supplier sorter, Topology... topologies)
{
this(sorter, topologies.length);
for (Topology topology : topologies)
add(topology);
}
@Override
public Topology current()
{
return get(0);
}
@Override
public Topology forEpoch(long epoch)
{
long index = get(0).epoch - epoch;
if (index < 0 || index > size())
throw new IndexOutOfBoundsException();
return get((int)index);
}
@Override
public long oldestEpoch()
{
return get(size() - 1).epoch;
}
@Override
public Topology get(int i)
{
return topologies.get(i);
}
@Override
public int size()
{
return topologies.size();
}
@Override
public int totalShards()
{
int count = 0;
for (int i=0, mi= topologies.size(); i<mi; i++)
count += topologies.get(i).size();
return count;
}
@Override
public boolean contains(Id to)
{
for (Topology topology : topologies)
{
if (topology.contains(to))
return true;
}
return false;
}
@Override
public int estimateUniqueNodes()
{
// just guess at one additional node per epoch, and at most twice as many nodes
int estSize = get(0).nodes().size();
return Math.min(estSize * 2, estSize + size() - 1);
}
@Override
public Set<Node.Id> nodes()
{
Set<Node.Id> result = Sets.newHashSetWithExpectedSize(estimateUniqueNodes());
for (int i=0,mi=size(); i<mi; i++)
result.addAll(get(i).nodes());
return result;
}
@Override
public Set<Id> copyOfNodes()
{
return nodes();
}
@Override
public Ranges computeRangesForNode(Id node)
{
Ranges ranges = Ranges.EMPTY;
for (int i = 0, mi = size() ; i < mi ; i++)
ranges = ranges.with(get(i).rangesForNode(node));
return ranges;
}
@Override
public int maxShardsPerEpoch()
{
return maxShardsPerEpoch;
}
public void add(Topology topology)
{
Invariants.checkArgument(topologies.isEmpty() || topology.epoch == topologies.get(topologies.size() - 1).epoch - 1);
topologies.add(topology);
}
@Override
public boolean equals(Object obj)
{
return Topologies.equals(this, obj);
}
@Override
public int hashCode()
{
return Topologies.hashCode(this);
}
@Override
public String toString()
{
return Topologies.toString(this);
}
@Override
public int compare(Id node1, Id node2, ShardSelection shards)
{
return sorter.compare(node1, node2, shards);
}
}
}