blob: bec8468358c2ad9f3ae28d40a55072e26ea59be4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package accord.topology;
import accord.local.Node;
import accord.primitives.Range;
import accord.primitives.RoutingKeys;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import static accord.Utils.*;
import static accord.impl.IntKey.keys;
import static accord.impl.IntKey.range;
import static accord.impl.SizeOfIntersectionSorter.SUPPLIER;
public class TopologyManagerTest
{
private static final Node.Id ID = new Node.Id(1);
@Test
void fastPathReconfiguration()
{
Range range = range(100, 200);
Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)));
TopologyManager service = new TopologyManager(SUPPLIER, ID);
Assertions.assertSame(Topology.EMPTY, service.current());
service.onTopologyUpdate(topology1);
service.onTopologyUpdate(topology2);
Assertions.assertTrue(service.getEpochStateUnsafe(1).syncComplete());
Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
service.onEpochSyncComplete(id(1), 2);
Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
service.onEpochSyncComplete(id(2), 2);
Assertions.assertTrue(service.getEpochStateUnsafe(2).syncComplete());
}
private static TopologyManager tracker()
{
Topology topology1 = topology(1,
shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)),
shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)));
Topology topology2 = topology(2,
shard(range(100, 200), idList(1, 2, 3), idSet(3, 4)),
shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)));
TopologyManager service = new TopologyManager(SUPPLIER, ID);
service.onTopologyUpdate(topology1);
service.onTopologyUpdate(topology2);
return service;
}
@Test
void syncCompleteFor()
{
TopologyManager service = tracker();
Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
service.onEpochSyncComplete(id(1), 2);
service.onEpochSyncComplete(id(2), 2);
Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
Assertions.assertTrue(service.getEpochStateUnsafe(2).syncCompleteFor(keys(150).toUnseekables()));
Assertions.assertFalse(service.getEpochStateUnsafe(2).syncCompleteFor(keys(250).toUnseekables()));
}
/**
* Epochs should only report being synced if every preceding epoch is also reporting synced
*/
@Test
void existingEpochPendingSync()
{
Range range = range(100, 200);
Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)));
Topology topology3 = topology(3, shard(range, idList(1, 2, 3), idSet(1, 2)));
TopologyManager service = new TopologyManager(SUPPLIER, ID);
service.onTopologyUpdate(topology1);
service.onTopologyUpdate(topology2);
service.onTopologyUpdate(topology3);
Assertions.assertTrue(service.getEpochStateUnsafe(1).syncComplete());
Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
Assertions.assertFalse(service.getEpochStateUnsafe(3).syncComplete());
// sync epoch 3
service.onEpochSyncComplete(id(1), 3);
service.onEpochSyncComplete(id(2), 3);
Assertions.assertTrue(service.getEpochStateUnsafe(1).syncComplete());
Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
Assertions.assertFalse(service.getEpochStateUnsafe(3).syncComplete());
// sync epoch 2
service.onEpochSyncComplete(id(2), 2);
service.onEpochSyncComplete(id(3), 2);
Assertions.assertTrue(service.getEpochStateUnsafe(1).syncComplete());
Assertions.assertTrue(service.getEpochStateUnsafe(2).syncComplete());
Assertions.assertTrue(service.getEpochStateUnsafe(3).syncComplete());
}
/**
* If a node receives sync acks for epochs it's not aware of, it should apply them when it finds out about the epoch
*/
@Test
void futureEpochPendingSync()
{
Range range = range(100, 200);
Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)));
// Topology topology3 = topology(3, shard(range, idList(1, 2, 3), idSet(3, 4)));
TopologyManager service = new TopologyManager(SUPPLIER, ID);
service.onTopologyUpdate(topology1);
// sync epoch 2
service.onEpochSyncComplete(id(2), 2);
service.onEpochSyncComplete(id(3), 2);
// learn of epoch 2
service.onTopologyUpdate(topology2);
Assertions.assertTrue(service.getEpochStateUnsafe(1).syncComplete());
Assertions.assertTrue(service.getEpochStateUnsafe(2).syncComplete());
// Assertions.assertTrue(service.getEpochStateUnsafe(3).syncComplete());
}
@Test
void forKeys()
{
Range range = range(100, 200);
Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)));
Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)));
TopologyManager service = new TopologyManager(SUPPLIER, ID);
Assertions.assertSame(Topology.EMPTY, service.current());
service.onTopologyUpdate(topology1);
service.onTopologyUpdate(topology2);
Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete());
RoutingKeys keys = keys(150).toUnseekables();
Assertions.assertEquals(topologies(topology2.forSelection(keys, Topology.OnUnknown.REJECT), topology1.forSelection(keys, Topology.OnUnknown.REJECT)),
service.withUnsyncedEpochs(keys, 2, 2));
service.onEpochSyncComplete(id(2), 2);
service.onEpochSyncComplete(id(3), 2);
Assertions.assertEquals(topologies(topology2.forSelection(keys, Topology.OnUnknown.REJECT)),
service.withUnsyncedEpochs(keys, 2, 2));
}
/**
* Previous epoch topologies should only be included if they haven't been acknowledged, even
* if the previous epoch is awaiting acknowledgement from all nodes
*/
@Test
void forKeysPartiallySynced()
{
Topology topology1 = topology(1,
shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)),
shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)));
Topology topology2 = topology(2,
shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)),
shard(range(200, 300), idList(4, 5, 6), idSet(5, 6)));
TopologyManager service = new TopologyManager(SUPPLIER, ID);
service.onTopologyUpdate(topology1);
service.onTopologyUpdate(topology2);
// no acks, so all epoch 1 shards should be included
Assertions.assertEquals(topologies(topology2, topology1),
service.withUnsyncedEpochs(keys(150, 250).toUnseekables(), 2, 2));
// first topology acked, so only the second shard should be included
service.onEpochSyncComplete(id(1), 1);
service.onEpochSyncComplete(id(2), 1);
Topologies actual = service.withUnsyncedEpochs(keys(150, 250).toUnseekables(), 2, 2);
Assertions.assertEquals(topologies(topology2, topology(1, shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)))),
actual);
}
@Test
void incompleteTopologyHistory()
{
Topology topology5 = topology(5,
shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)),
shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)));
Topology topology6 = topology(6,
shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)),
shard(range(200, 300), idList(4, 5, 6), idSet(5, 6)));
TopologyManager service = new TopologyManager(SUPPLIER, ID);
service.onTopologyUpdate(topology5);
service.onTopologyUpdate(topology6);
Assertions.assertSame(topology6, service.getEpochStateUnsafe(6).global());
Assertions.assertSame(topology5, service.getEpochStateUnsafe(5).global());
for (int i=1; i<=6; i++) service.onEpochSyncComplete(id(i), 5);
Assertions.assertTrue(service.getEpochStateUnsafe(5).syncComplete());
Assertions.assertNull(service.getEpochStateUnsafe(4));
service.onEpochSyncComplete(id(1), 4);
}
private static void markTopologySynced(TopologyManager service, long epoch)
{
service.getEpochStateUnsafe(epoch).global().nodes().forEach(id -> service.onEpochSyncComplete(id, epoch));
}
private static void addAndMarkSynced(TopologyManager service, Topology topology)
{
service.onTopologyUpdate(topology);
markTopologySynced(service, topology.epoch());
}
@Test
void truncateTopologyHistory()
{
Range range = range(100, 200);
TopologyManager service = new TopologyManager(SUPPLIER, ID);
addAndMarkSynced(service, topology(1, shard(range, idList(1, 2, 3), idSet(1, 2))));
addAndMarkSynced(service, topology(2, shard(range, idList(1, 2, 3), idSet(2, 3))));
addAndMarkSynced(service, topology(3, shard(range, idList(1, 2, 3), idSet(1, 2))));
addAndMarkSynced(service, topology(4, shard(range, idList(1, 2, 3), idSet(1, 3))));
Assertions.assertTrue(service.hasEpoch(1));
Assertions.assertTrue(service.hasEpoch(2));
Assertions.assertTrue(service.hasEpoch(3));
Assertions.assertTrue(service.hasEpoch(4));
service.truncateTopologyUntil(3);
Assertions.assertFalse(service.hasEpoch(1));
Assertions.assertFalse(service.hasEpoch(2));
Assertions.assertTrue(service.hasEpoch(3));
Assertions.assertTrue(service.hasEpoch(4));
}
@Test
void truncateTopologyCantTruncateUnsyncedEpochs()
{
}
}