blob: c3308b7002660fb0affc5d0b7491742b1117d59f [file] [log] [blame]
package accord.topology;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import accord.local.Node.Id;
import accord.api.Key;
import accord.txn.Keys;
import accord.utils.IndexedConsumer;
public class Topology extends AbstractCollection<Shard>
{
// TODO: introduce range version of Keys
final Keys starts;
final Shard[] shards;
final Map<Id, Shards.NodeInfo> nodeLookup;
final Keys subsetOfStarts;
final int[] supersetIndexes;
static class NodeInfo
{
final Keys starts;
final int[] supersetIndexes;
NodeInfo(Keys starts, int[] supersetIndexes)
{
this.starts = starts;
this.supersetIndexes = supersetIndexes;
}
@Override
public String toString()
{
return starts.toString();
}
}
public Topology(Shard... shards)
{
this.starts = new Keys(Arrays.stream(shards).map(shard -> shard.start).sorted().collect(Collectors.toList()));
this.shards = shards;
this.subsetOfStarts = starts;
this.supersetIndexes = IntStream.range(0, shards.length).toArray();
this.nodeLookup = new HashMap<>();
Map<Id, List<Integer>> build = new HashMap<>();
for (int i = 0 ; i < shards.length ; ++i)
{
for (Id node : shards[i].nodes)
build.computeIfAbsent(node, ignore -> new ArrayList<>()).add(i);
}
for (Map.Entry<Id, List<Integer>> e : build.entrySet())
{
int[] supersetIndexes = e.getValue().stream().mapToInt(i -> i).toArray();
Keys starts = this.starts.select(supersetIndexes);
nodeLookup.put(e.getKey(), new Shards.NodeInfo(starts, supersetIndexes));
}
}
public Topology(Keys starts, Shard[] shards, Map<Id, Shards.NodeInfo> nodeLookup, Keys subsetOfStarts, int[] supersetIndexes)
{
this.starts = starts;
this.shards = shards;
this.nodeLookup = nodeLookup;
this.subsetOfStarts = subsetOfStarts;
this.supersetIndexes = supersetIndexes;
}
public Shards forNode(Id node)
{
NodeInfo info = nodeLookup.get(node);
if (info == null)
return Shards.EMPTY;
return forKeys(info.starts);
}
public Shard forKey(Key key)
{
int i = starts.floorIndex(key);
return shards[i];
}
public Shards forKeys(Keys select)
{
int subsetIndex = 0;
int count = 0;
int[] newSubset = new int[Math.min(select.size(), subsetOfStarts.size())];
for (int i = 0 ; i < select.size() ; )
{
subsetIndex = subsetOfStarts.floorIndex(subsetIndex, subsetOfStarts.size(), select.get(i));
int supersetIndex = supersetIndexes[subsetIndex];
newSubset[count++] = supersetIndex;
Shard shard = shards[supersetIndex];
i = select.ceilIndex(i, select.size(), shard.end);
}
if (count != newSubset.length)
newSubset = Arrays.copyOf(newSubset, count);
Keys subsetOfKeys = starts.select(newSubset);
return new Shards(starts, shards, nodeLookup, subsetOfKeys, newSubset);
}
/**
* @param on the node to limit our selection to
* @param select may be a superSet of the keys owned by {@code on} but not of this {@code Shards}
*/
public void forEachOn(Id on, Keys select, IndexedConsumer<Shard> consumer)
{
Shards.NodeInfo info = nodeLookup.get(on);
// int nodeIndex = 0;
// int subsetIndex = 0;
// for (int i = select.ceilIndex(info.starts.get(0)) ; i < select.size() ; )
// {
// nodeIndex = info.starts.floorIndex(nodeIndex, info.starts.size(), select.get(i));
// int supersetIndex = info.supersetIndexes[nodeIndex];
// Shard shard = shards[supersetIndex];
// if (shard.end.compareTo(select.get(i)) > 0)
// {
// subsetIndex = Arrays.binarySearch(supersetIndexes, subsetIndex, supersetIndexes.length, supersetIndex);
// consumer.accept(subsetIndex, shard);
// }
// i = select.ceilIndex(i + 1, select.size(), shard.end);
// }
for (int i = 0, j = 0, k = 0 ; i < select.size() && j < supersetIndexes.length && k < info.supersetIndexes.length ;)
{
Key key = select.get(i);
Shard shard = shards[supersetIndexes[j]];
int c = supersetIndexes[j] - info.supersetIndexes[k];
if (c < 0) ++j;
else if (c > 0) ++k;
else if (key.compareTo(shard.start) < 0) ++i;
else if (key.compareTo(shard.end) < 0) { consumer.accept(j, shard); i++; j++; k++; }
else { j++; k++; }
}
}
public void forEachOn(Id on, IndexedConsumer<Shard> consumer)
{
// TODO: this can be done by divide-and-conquer splitting of the lists and recursion, which should be more efficient
Shards.NodeInfo info = nodeLookup.get(on);
int[] a = supersetIndexes, b = info.supersetIndexes;
int ai = 0, bi = 0;
while (ai < a.length && bi < b.length)
{
if (a[ai] == b[bi])
{
consumer.accept(ai, shards[a[ai]]);
++ai; ++bi;
}
else if (a[ai] < b[bi])
{
ai = Arrays.binarySearch(a, ai + 1, a.length, b[bi]);
if (ai < 0) ai = -1 -ai;
}
else
{
bi = Arrays.binarySearch(b, bi + 1, b.length, a[ai]);
if (bi < 0) bi = -1 -bi;
}
}
}
public void forEach(IndexedConsumer<Shard> consumer)
{
for (int i = 0 ; i < supersetIndexes.length ; ++i)
consumer.accept(i, shards[supersetIndexes[i]]);
}
public <T> T[] select(Keys select, T[] indexedByShard, IntFunction<T[]> constructor)
{
List<T> selection = new ArrayList<>();
// int subsetIndex = 0;
// for (int i = select.ceilIndex(shards[supersetIndexes[0]].start) ; i < select.size() ; )
// {
// subsetIndex = subsetOfStarts.floorIndex(subsetIndex, subsetOfStarts.size(), select.get(i));
// selection.add(indexedByShard[subsetIndex]);
// Shard shard = shards[supersetIndexes[subsetIndex]];
// i = select.ceilIndex(i + 1, select.size(), shard.end);
// }
// int minSubsetIndex = 0;
// for (int i = select.ceilIndex(shards[supersetIndexes[0]].start) ; i < select.size() ; )
// {
// int subsetIndex = subsetOfStarts.floorIndex(minSubsetIndex, subsetOfStarts.size(), select.get(i));
// selection.add(indexedByShard[subsetIndex]);
// minSubsetIndex = subsetIndex + 1;
// if (minSubsetIndex == supersetIndexes.length)
// break;
// Shard shard = shards[supersetIndexes[minSubsetIndex]];
// i = select.ceilIndex(i + 1, select.size(), shard.start);
// }
// int minSubsetIndex = 0;
// for (int i = select.ceilIndex(shards[supersetIndexes[0]].start) ; i < select.size() ; )
// {
// int subsetIndex = subsetOfStarts.floorIndex(minSubsetIndex, subsetOfStarts.size(), select.get(i));
// Shard shard = shards[supersetIndexes[subsetIndex]];
// if (shard.end.compareTo(select.get(i)) > 0)
// selection.add(indexedByShard[subsetIndex]);
// minSubsetIndex = subsetIndex + 1;
// if (minSubsetIndex == supersetIndexes.length)
// break;
//
// shard = shards[supersetIndexes[minSubsetIndex]];
// i = select.ceilIndex(i + 1, select.size(), shard.start);
// }
for (int i = 0, j = 0 ; i < select.size() && j < supersetIndexes.length ;)
{
Key k = select.get(i);
Shard shard = shards[supersetIndexes[j]];
int c = k.compareTo(shard.start);
if (c < 0) ++i;
else if (k.compareTo(shard.end) < 0) { selection.add(indexedByShard[j++]); i++; }
else j++;
}
return selection.toArray(constructor);
}
@Override
public Iterator<Shard> iterator()
{
return IntStream.of(supersetIndexes).mapToObj(i -> shards[i]).iterator();
}
@Override
public int size()
{
return subsetOfStarts.size();
}
public Shard get(int index)
{
return shards[supersetIndexes[index]];
}
}