blob: bec4dbabc91a81adecb1802f09fc492808b9e709 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.placementdriver;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
import static org.apache.ignite.internal.lang.ByteArray.fromString;
import static org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.affinity.Assignments;
import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.configuration.MetaStorageConfiguration;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NetworkMessageHandler;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite.internal.placementdriver.leases.Lease;
import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage;
import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
/**
* There are tests for Placement driver manager.
*/
@ExtendWith(ConfigurationExtension.class)
public class PlacementDriverManagerTest extends BasePlacementDriverTest {
public static final int PORT = 1234;
private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
private String nodeName;
/** Another node name. The node name is matched to {@code anotherClusterService}. */
private String anotherNodeName;
private HybridClock nodeClock = new HybridClockImpl();
private ClusterService clusterService;
private LogicalTopologyServiceTestImpl logicalTopologyService;
/** This service is used to redirect a lease proposal. */
private ClusterService anotherClusterService;
private Loza raftManager;
@InjectConfiguration
private RaftConfiguration raftConfiguration;
@InjectConfiguration
private MetaStorageConfiguration metaStorageConfiguration;
private MetaStorageManagerImpl metaStorageManager;
private PlacementDriverManager placementDriverManager;
private TestInfo testInfo;
/** This closure handles {@link LeaseGrantedMessage} to check the placement driver manager behavior. */
private BiFunction<LeaseGrantedMessage, String, LeaseGrantedMessageResponse> leaseGrantHandler;
private final AtomicInteger nextTableId = new AtomicInteger();
@BeforeEach
public void beforeTest(TestInfo testInfo) {
this.nodeName = testNodeName(testInfo, PORT);
this.anotherNodeName = testNodeName(testInfo, PORT + 1);
this.testInfo = testInfo;
assertTrue(nodeName.hashCode() < anotherNodeName.hashCode(),
"Node for the first lease grant message should be determined strictly.");
startPlacementDriverManager();
}
private void startPlacementDriverManager() {
var nodeFinder = new StaticNodeFinder(Collections.singletonList(new NetworkAddress("localhost", PORT)));
clusterService = ClusterServiceTestUtils.clusterService(testInfo, PORT, nodeFinder);
anotherClusterService = ClusterServiceTestUtils.clusterService(testInfo, PORT + 1, nodeFinder);
anotherClusterService.messagingService().addMessageHandler(
PlacementDriverMessageGroup.class,
leaseGrantMessageHandler(anotherNodeName)
);
clusterService.messagingService().addMessageHandler(PlacementDriverMessageGroup.class, leaseGrantMessageHandler(nodeName));
ClusterManagementGroupManager cmgManager = mock(ClusterManagementGroupManager.class);
when(cmgManager.metaStorageNodes()).thenReturn(completedFuture(Set.of(clusterService.nodeName())));
RaftGroupEventsClientListener eventsClientListener = new RaftGroupEventsClientListener();
logicalTopologyService = new LogicalTopologyServiceTestImpl(clusterService);
TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
clusterService,
logicalTopologyService,
Loza.FACTORY,
eventsClientListener
);
raftManager = new Loza(
clusterService,
raftConfiguration,
workDir.resolve("loza"),
nodeClock,
eventsClientListener
);
var storage = new SimpleInMemoryKeyValueStorage(nodeName);
metaStorageManager = new MetaStorageManagerImpl(
clusterService,
cmgManager,
logicalTopologyService,
raftManager,
storage,
nodeClock,
topologyAwareRaftGroupServiceFactory,
metaStorageConfiguration
);
placementDriverManager = new PlacementDriverManager(
nodeName,
metaStorageManager,
MetastorageGroupId.INSTANCE,
clusterService,
cmgManager::metaStorageNodes,
logicalTopologyService,
raftManager,
topologyAwareRaftGroupServiceFactory,
new TestClockService(nodeClock)
);
assertThat(
startAsync(clusterService, anotherClusterService, raftManager, metaStorageManager)
.thenCompose(unused -> metaStorageManager.recoveryFinishedFuture())
.thenCompose(unused -> placementDriverManager.startAsync())
.thenCompose(unused -> metaStorageManager.notifyRevisionUpdateListenerOnStart())
.thenCompose(unused -> metaStorageManager.deployWatches()),
willCompleteSuccessfully()
);
}
/**
* Handles a lease grant message.
*
* @param handlerNode Node which will handles the message.
* @return Response message.
*/
private NetworkMessageHandler leaseGrantMessageHandler(String handlerNode) {
return (msg, sender, correlationId) -> {
assert msg instanceof LeaseGrantedMessage : "Message type is unexpected [type=" + msg.getClass().getSimpleName() + ']';
log.info("Lease is being granted [actor={}, recipient={}, force={}]", sender, handlerNode,
((LeaseGrantedMessage) msg).force());
LeaseGrantedMessageResponse resp = null;
if (leaseGrantHandler != null) {
resp = leaseGrantHandler.apply((LeaseGrantedMessage) msg, handlerNode);
}
if (resp == null) {
resp = PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse()
.accepted(true)
.build();
}
clusterService.messagingService().respond(sender, resp, correlationId);
};
}
@AfterEach
public void afterEach() throws Exception {
stopPlacementDriverManager();
}
private void stopPlacementDriverManager() throws Exception {
List<IgniteComponent> igniteComponents = List.of(
placementDriverManager,
metaStorageManager,
raftManager,
clusterService,
anotherClusterService
);
closeAll(Stream.concat(
igniteComponents.stream().filter(Objects::nonNull).map(component -> component::beforeNodeStop),
Stream.of(() -> assertThat(stopAsync(igniteComponents), willCompleteSuccessfully())))
);
}
@Test
public void testLeaseCreate() throws Exception {
TablePartitionId grpPart0 = createTableAssignment();
checkLeaseCreated(grpPart0, false);
}
@Test
@WithSystemProperty(key = "IGNITE_LONG_LEASE", value = "200")
public void testLeaseRenew() throws Exception {
TablePartitionId grpPart0 = createTableAssignment();
checkLeaseCreated(grpPart0, false);
var leaseFut = metaStorageManager.get(PLACEMENTDRIVER_LEASES_KEY);
Lease lease = leaseFromBytes(leaseFut.join().value(), grpPart0);
assertNotNull(lease);
assertTrue(waitForCondition(() -> {
var fut = metaStorageManager.get(PLACEMENTDRIVER_LEASES_KEY);
Lease leaseRenew = leaseFromBytes(fut.join().value(), grpPart0);
return lease.getExpirationTime().compareTo(leaseRenew.getExpirationTime()) < 0;
}, 10_000));
}
@Test
@WithSystemProperty(key = "IGNITE_LONG_LEASE", value = "200")
public void testLeaseholderUpdate() throws Exception {
TablePartitionId grpPart0 = createTableAssignment();
checkLeaseCreated(grpPart0, false);
Set<Assignment> assignments = Set.of();
metaStorageManager.put(fromString(STABLE_ASSIGNMENTS_PREFIX + grpPart0), Assignments.toBytes(assignments));
assertTrue(waitForCondition(() -> {
var fut = metaStorageManager.get(PLACEMENTDRIVER_LEASES_KEY);
Lease lease = leaseFromBytes(fut.join().value(), grpPart0);
return lease.getExpirationTime().compareTo(nodeClock.now()) < 0;
}, 10_000));
assignments = calculateAssignmentForPartition(Collections.singleton(nodeName), 1, 1);
metaStorageManager.put(fromString(STABLE_ASSIGNMENTS_PREFIX + grpPart0), Assignments.toBytes(assignments));
assertTrue(waitForCondition(() -> {
var fut = metaStorageManager.get(PLACEMENTDRIVER_LEASES_KEY);
Lease lease = leaseFromBytes(fut.join().value(), grpPart0);
return lease.getExpirationTime().compareTo(nodeClock.now()) > 0;
}, 10_000));
}
@Test
public void testPrimaryReplicaEvents() throws Exception {
TablePartitionId grpPart0 = createTableAssignment(metaStorageManager, nextTableId.incrementAndGet(), List.of(nodeName));
Lease lease1 = checkLeaseCreated(grpPart0, true);
ConcurrentHashMap<String, HybridTimestamp> electedEvts = new ConcurrentHashMap<>(2);
ConcurrentHashMap<String, HybridTimestamp> expiredEvts = new ConcurrentHashMap<>(2);
placementDriverManager.placementDriver().listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, evt -> {
log.info("Primary replica is elected [grp={}]", evt.groupId());
electedEvts.put(evt.leaseholderId(), evt.startTime());
return falseCompletedFuture();
});
placementDriverManager.placementDriver().listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, evt -> {
log.info("Primary replica is expired [grp={}]", evt.groupId());
expiredEvts.put(evt.leaseholderId(), evt.startTime());
return falseCompletedFuture();
});
Set<Assignment> assignments = calculateAssignmentForPartition(Collections.singleton(anotherNodeName), 1, 1);
metaStorageManager.put(fromString(STABLE_ASSIGNMENTS_PREFIX + grpPart0), Assignments.toBytes(assignments));
assertTrue(waitForCondition(() -> {
CompletableFuture<ReplicaMeta> fut = placementDriverManager.placementDriver()
.getPrimaryReplica(grpPart0, lease1.getExpirationTime());
ReplicaMeta meta = fut.join();
return meta != null && meta.getLeaseholder().equals(anotherNodeName);
}, 10_000));
Lease lease2 = checkLeaseCreated(grpPart0, true);
assertNotEquals(lease1.getLeaseholder(), lease2.getLeaseholder());
assertEquals(1, electedEvts.size());
assertEquals(1, expiredEvts.size());
assertTrue(electedEvts.containsKey(lease2.getLeaseholderId()));
assertTrue(expiredEvts.containsKey(lease1.getLeaseholderId()));
stopAnotherNode(anotherClusterService);
anotherClusterService = startAnotherNode(anotherNodeName, PORT + 1);
assertTrue(waitForCondition(() -> {
CompletableFuture<ReplicaMeta> fut = placementDriverManager.placementDriver()
.getPrimaryReplica(grpPart0, lease2.getExpirationTime());
ReplicaMeta meta = fut.join();
return meta != null && meta.getLeaseholderId().equals(anotherClusterService.topologyService().localMember().id());
}, 10_000));
Lease lease3 = checkLeaseCreated(grpPart0, true);
assertEquals(2, electedEvts.size());
assertEquals(2, expiredEvts.size());
assertTrue(electedEvts.containsKey(lease3.getLeaseholderId()));
assertTrue(expiredEvts.containsKey(lease2.getLeaseholderId()));
}
/**
* Stops another node.
*
* @param nodeClusterService Node service to stop.
* @throws Exception If failed.
*/
private void stopAnotherNode(ClusterService nodeClusterService) throws Exception {
nodeClusterService.beforeNodeStop();
assertThat(nodeClusterService.stopAsync(), willCompleteSuccessfully());
assertTrue(waitForCondition(
() -> !clusterService.topologyService().allMembers().contains(nodeClusterService.topologyService().localMember()),
10_000
));
logicalTopologyService.updateTopology();
}
/**
* Starts another node.
*
* @param nodeName Node name.
* @param port Node port.
* @return Cluster service for the newly started node.
* @throws Exception If failed.
*/
private ClusterService startAnotherNode(String nodeName, int port) throws Exception {
ClusterService nodeClusterService = ClusterServiceTestUtils.clusterService(
testInfo,
port,
new StaticNodeFinder(Collections.singletonList(new NetworkAddress("localhost", PORT)))
);
nodeClusterService.messagingService().addMessageHandler(
PlacementDriverMessageGroup.class,
leaseGrantMessageHandler(nodeName)
);
assertThat(nodeClusterService.startAsync(), willCompleteSuccessfully());
assertTrue(waitForCondition(
() -> clusterService.topologyService().allMembers().contains(nodeClusterService.topologyService().localMember()),
10_000
));
logicalTopologyService.updateTopology();
return nodeClusterService;
}
@Test
public void testLeaseRemovedAfterExpirationAndAssignmetnsRemoval() throws Exception {
List<TablePartitionId> groupIds = List.of(
createTableAssignment(metaStorageManager, nextTableId.incrementAndGet(), List.of(nodeName)),
createTableAssignment(metaStorageManager, nextTableId.incrementAndGet(), List.of(nodeName))
);
Map<TablePartitionId, AtomicBoolean> leaseExpirationMap =
groupIds.stream().collect(Collectors.toMap(id -> id, id -> new AtomicBoolean()));
groupIds.forEach(groupId -> {
placementDriverManager.placementDriver().listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, evt -> {
log.info("Primary replica is expired [grp={}]", groupId);
leaseExpirationMap.get(groupId).set(true);
return falseCompletedFuture();
});
});
checkLeaseCreated(groupIds.get(0), true);
checkLeaseCreated(groupIds.get(1), true);
assertFalse(leaseExpirationMap.get(groupIds.get(0)).get());
assertFalse(leaseExpirationMap.get(groupIds.get(1)).get());
metaStorageManager.remove(fromString(STABLE_ASSIGNMENTS_PREFIX + groupIds.get(0)));
assertTrue(waitForCondition(() -> {
var fut = metaStorageManager.get(PLACEMENTDRIVER_LEASES_KEY);
// Only lease from grpPart0 should be removed.
return leaseFromBytes(fut.join().value(), groupIds.get(0)) == null
&& leaseFromBytes(fut.join().value(), groupIds.get(1)) != null;
}, 10_000));
}
@Test
public void testLeaseAccepted() throws Exception {
TablePartitionId grpPart0 = createTableAssignment();
checkLeaseCreated(grpPart0, true);
}
@Test
public void testLeaseForceAccepted() throws Exception {
leaseGrantHandler = (req, handler) ->
PLACEMENT_DRIVER_MESSAGES_FACTORY
.leaseGrantedMessageResponse()
.accepted(req.force())
.build();
TablePartitionId grpPart0 = createTableAssignment();
checkLeaseCreated(grpPart0, true);
}
@Test
public void testExceptionOnAcceptance() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
leaseGrantHandler = (req, handler) -> {
latch.countDown();
throw new RuntimeException("test");
};
TablePartitionId grpPart0 = createTableAssignment();
checkLeaseCreated(grpPart0, false);
latch.await();
Lease lease = checkLeaseCreated(grpPart0, false);
assertFalse(lease.isAccepted());
}
@Test
public void testRedirectionAcceptance() throws Exception {
AtomicReference<String> redirect = new AtomicReference<>();
leaseGrantHandler = (req, handler) -> {
if (redirect.get() == null) {
redirect.set(handler.equals(nodeName) ? anotherNodeName : nodeName);
return PLACEMENT_DRIVER_MESSAGES_FACTORY
.leaseGrantedMessageResponse()
.accepted(false)
.redirectProposal(redirect.get())
.build();
} else {
return PLACEMENT_DRIVER_MESSAGES_FACTORY
.leaseGrantedMessageResponse()
.accepted(redirect.get().equals(handler))
.build();
}
};
TablePartitionId grpPart0 = createTableAssignment();
checkLeaseCreated(grpPart0, true);
}
@Test
public void testLeaseRestore() throws Exception {
TablePartitionId grpPart0 = createTableAssignment();
checkLeaseCreated(grpPart0, false);
stopPlacementDriverManager();
startPlacementDriverManager();
checkLeaseCreated(grpPart0, false);
}
@Test
public void testLeaseMatchGrantMessage() throws Exception {
var leaseGrantReqRef = new AtomicReference<LeaseGrantedMessage>();
leaseGrantHandler = (req, handler) -> {
leaseGrantReqRef.set(req);
return null;
};
TablePartitionId grpPart0 = createTableAssignment();
Lease lease = checkLeaseCreated(grpPart0, false);
assertTrue(waitForCondition(() -> leaseGrantReqRef.get() != null, 10_000));
assertEquals(leaseGrantReqRef.get().leaseStartTime(), lease.getStartTime());
assertTrue(leaseGrantReqRef.get().leaseExpirationTime().compareTo(lease.getExpirationTime()) >= 0);
}
/**
* Checks if a group lease is created during the timeout.
*
* @param grpPartId Replication group id.
* @param waitAccept Await a lease with the accepted flag.
* @return A lease that is read from Meta storage.
* @throws InterruptedException If the waiting is interrupted.
*/
private Lease checkLeaseCreated(TablePartitionId grpPartId, boolean waitAccept) throws InterruptedException {
AtomicReference<Lease> leaseRef = new AtomicReference<>();
assertTrue(waitForCondition(() -> {
var leaseFut = metaStorageManager.get(PLACEMENTDRIVER_LEASES_KEY);
var leaseEntry = leaseFut.join();
if (leaseEntry != null && !leaseEntry.empty()) {
Lease lease = leaseFromBytes(leaseEntry.value(), grpPartId);
if (lease == null) {
return false;
}
if (!waitAccept) {
leaseRef.set(lease);
} else if (lease.isAccepted()) {
leaseRef.set(lease);
}
}
return leaseRef.get() != null;
}, 10_000));
return leaseRef.get();
}
/**
* Creates an assignment for the fake table.
*
* @return Replication group id.
*/
private TablePartitionId createTableAssignment() {
return createTableAssignment(metaStorageManager, nextTableId.incrementAndGet(), List.of(nodeName, anotherNodeName));
}
/**
* Test implementation of {@link LogicalTopologyService}.
*/
protected static class LogicalTopologyServiceTestImpl implements LogicalTopologyService {
private final ClusterService clusterService;
private List<LogicalTopologyEventListener> listeners;
private int ver = 1;
public LogicalTopologyServiceTestImpl(ClusterService clusterService) {
this.clusterService = clusterService;
this.listeners = new ArrayList<>();
}
@Override
public void addEventListener(LogicalTopologyEventListener listener) {
this.listeners.add(listener);
}
@Override
public void removeEventListener(LogicalTopologyEventListener listener) {
this.listeners.remove(listener);
}
/**
* Updates logical topology to the physical one.
*/
public void updateTopology() {
if (listeners != null) {
var topologySnapshot = new LogicalTopologySnapshot(
++ver,
clusterService.topologyService().allMembers().stream().map(LogicalNode::new).collect(toSet())
);
listeners.forEach(lnsr -> lnsr.onTopologyLeap(topologySnapshot));
}
}
@Override
public CompletableFuture<LogicalTopologySnapshot> logicalTopologyOnLeader() {
return completedFuture(new LogicalTopologySnapshot(
ver,
clusterService.topologyService().allMembers().stream().map(LogicalNode::new).collect(toSet()))
);
}
@Override
public LogicalTopologySnapshot localLogicalTopology() {
return new LogicalTopologySnapshot(
ver,
clusterService.topologyService().allMembers().stream().map(LogicalNode::new).collect(toSet()));
}
@Override
public CompletableFuture<Set<ClusterNode>> validatedNodesOnLeader() {
return completedFuture(Set.copyOf(clusterService.topologyService().allMembers()));
}
}
}