| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.cache.CacheMode; |
| import org.apache.ignite.cluster.ClusterMetrics; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; |
| import org.apache.ignite.internal.util.typedef.C1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteProductVersion; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.jetbrains.annotations.Nullable; |
| import org.junit.Test; |
| |
| import static java.util.concurrent.TimeUnit.MINUTES; |
| import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; |
| import static org.apache.ignite.lang.IgniteProductVersion.fromString; |
| |
| /** |
| * GridDiscovery self test. |
| */ |
| public class GridDiscoverySelfTest extends GridCommonAbstractTest { |
| /** */ |
| private static Ignite ignite; |
| |
| /** Nodes count. */ |
| private static final int NODES_CNT = 5; |
| |
| /** Maximum timeout when remote nodes join/left the topology */ |
| private static final int MAX_TIMEOUT_IN_MINS = 5; |
| |
| /** */ |
| public GridDiscoverySelfTest() { |
| super(/*start grid*/true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| CacheConfiguration cacheCfg = defaultCacheConfiguration(); |
| |
| //cacheCfg.setName(null); |
| cacheCfg.setCacheMode(CacheMode.PARTITIONED); |
| |
| cfg.setCacheConfiguration(cacheCfg); |
| |
| return cfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTestsStarted() throws Exception { |
| ignite = G.ignite(getTestIgniteInstanceName()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTestsStopped() throws Exception { |
| super.afterTestsStopped(); |
| |
| ignite = null; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGetRemoteNodes() throws Exception { |
| Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes(); |
| |
| printNodes(nodes); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGetAllNodes() throws Exception { |
| Collection<ClusterNode> nodes = ignite.cluster().nodes(); |
| |
| printNodes(nodes); |
| |
| assert nodes != null; |
| assert !nodes.isEmpty(); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testGetLocalNode() throws Exception { |
| ClusterNode node = ignite.cluster().localNode(); |
| |
| assert node != null; |
| |
| Collection<ClusterNode> nodes = ignite.cluster().nodes(); |
| |
| assert nodes != null; |
| assert nodes.contains(node); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPingNode() throws Exception { |
| ClusterNode node = ignite.cluster().localNode(); |
| |
| assert node != null; |
| |
| boolean pingRes = ignite.cluster().pingNode(node.id()); |
| |
| assert pingRes : "Failed to ping local node."; |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDiscoveryListener() throws Exception { |
| ClusterNode node = ignite.cluster().localNode(); |
| |
| assert node != null; |
| |
| final AtomicInteger cnt = new AtomicInteger(); |
| |
| // Joined nodes counter. |
| final CountDownLatch joinedCnt = new CountDownLatch(NODES_CNT); |
| |
| // Left nodes counter. |
| final CountDownLatch leftCnt = new CountDownLatch(NODES_CNT); |
| |
| IgnitePredicate<Event> lsnr = new IgnitePredicate<Event>() { |
| @Override public boolean apply(Event evt) { |
| if (EVT_NODE_JOINED == evt.type()) { |
| cnt.incrementAndGet(); |
| |
| joinedCnt.countDown(); |
| } |
| else if (EVT_NODE_LEFT == evt.type() || EVT_NODE_FAILED == evt.type()) { |
| int i = cnt.decrementAndGet(); |
| |
| assert i >= 0; |
| |
| leftCnt.countDown(); |
| } |
| else |
| assert false; |
| |
| return true; |
| } |
| }; |
| |
| int[] evts = tcpDiscovery() ? new int[]{EVT_NODE_LEFT, EVT_NODE_JOINED} : |
| new int[]{EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_JOINED}; |
| |
| ignite.events().localListen(lsnr, evts); |
| |
| try { |
| for (int i = 0; i < NODES_CNT; i++) |
| startGrid(i); |
| |
| joinedCnt.await(MAX_TIMEOUT_IN_MINS, MINUTES); |
| |
| assert cnt.get() == NODES_CNT; |
| |
| for (int i = 0; i < NODES_CNT; i++) |
| stopGrid(i); |
| |
| leftCnt.await(MAX_TIMEOUT_IN_MINS, MINUTES); |
| |
| assert cnt.get() == 0; |
| |
| ignite.events().stopLocalListen(lsnr); |
| |
| assert cnt.get() == 0; |
| } |
| finally { |
| for (int i = 0; i < NODES_CNT; i++) |
| stopAndCancelGrid(i); |
| } |
| } |
| |
| /** |
| * Test cache nodes resolved correctly from topology history. |
| * |
| * @throws Exception In case of any exception. |
| */ |
| @Test |
| public void testCacheNodes() throws Exception { |
| // Validate only original node is available. |
| GridDiscoveryManager discoMgr = ((IgniteKernal)ignite).context().discovery(); |
| |
| Collection<ClusterNode> nodes = discoMgr.allNodes(); |
| |
| assert nodes.size() == 1 : "Expects only original node is available: " + nodes; |
| |
| final long topVer0 = discoMgr.topologyVersion(); |
| |
| assert topVer0 > 0 : "Unexpected initial topology version: " + topVer0; |
| |
| List<UUID> uuids = new ArrayList<>(NODES_CNT); |
| |
| UUID locId = ignite.cluster().localNode().id(); |
| |
| try { |
| // Start nodes. |
| for (int i = 0; i < NODES_CNT; i++) |
| uuids.add(startGrid(i).cluster().localNode().id()); |
| |
| // Stop nodes. |
| for (int i = 0; i < NODES_CNT; i++) |
| stopGrid(i); |
| |
| waitForTopology(1); |
| |
| final long topVer = discoMgr.topologyVersion(); |
| |
| assert topVer == topVer0 + NODES_CNT * 2 : "Unexpected topology version: " + topVer; |
| |
| for (long ver = topVer0; ver <= topVer; ver++) { |
| Collection<UUID> exp = new ArrayList<>(); |
| |
| exp.add(locId); |
| |
| for (int i = 0; i < NODES_CNT && i < ver - topVer0; i++) |
| exp.add(uuids.get(i)); |
| |
| for (int i = 0; i < ver - topVer0 - NODES_CNT; i++) |
| exp.remove(uuids.get(i)); |
| |
| // Cache nodes by topology version (e.g. NODE_CNT == 3). |
| // 0 1 2 3 (node id) |
| // 1 (topVer) + - only local node |
| // 2 + + |
| // 3 + + + |
| // 4 + + + + |
| // 5 + + + |
| // 6 + + |
| // 7 + - only local node |
| |
| Collection<ClusterNode> cacheNodes = discoMgr.cacheNodes(DEFAULT_CACHE_NAME, new AffinityTopologyVersion(ver)); |
| |
| Collection<UUID> act = new ArrayList<>(F.viewReadOnly(cacheNodes, new C1<ClusterNode, UUID>() { |
| @Override public UUID apply(ClusterNode n) { |
| return n.id(); |
| } |
| })); |
| |
| assertEquals("Expects correct cache nodes for topology version: " + ver, exp, act); |
| } |
| } |
| finally { |
| for (int i = 0; i < NODES_CNT; i++) |
| stopAndCancelGrid(i); |
| } |
| } |
| |
| /** |
| * @param nodes Nodes. |
| */ |
| private void printNodes(Collection<ClusterNode> nodes) { |
| StringBuilder buf = new StringBuilder(); |
| |
| if (nodes != null && !nodes.isEmpty()) { |
| buf.append("Found nodes [nodes={"); |
| |
| int i = 0; |
| |
| for (Iterator<ClusterNode> iter = nodes.iterator(); iter.hasNext(); i++) { |
| ClusterNode node = iter.next(); |
| |
| buf.append(node.id()); |
| |
| if (i + 1 != nodes.size()) |
| buf.append(", "); |
| } |
| |
| buf.append("}]"); |
| } |
| else |
| buf.append("Found no nodes."); |
| |
| if (log().isDebugEnabled()) |
| log().debug(buf.toString()); |
| } |
| |
| /** |
| * |
| */ |
| private static class GridDiscoveryTestNode extends GridMetadataAwareAdapter implements ClusterNode { |
| /** */ |
| private static AtomicInteger consistentIdCtr = new AtomicInteger(); |
| |
| /** */ |
| private UUID nodeId = UUID.randomUUID(); |
| |
| /** */ |
| private Object consistentId = consistentIdCtr.incrementAndGet(); |
| |
| /** {@inheritDoc} */ |
| @Override public long order() { |
| return -1; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteProductVersion version() { |
| return fromString("99.99.99"); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public UUID id() { |
| return nodeId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Object consistentId() { |
| return consistentId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public <T> T attribute(String name) { |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public ClusterMetrics metrics() { |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Map<String, Object> attributes() { |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<String> addresses() { |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isLocal() { |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isDaemon() { |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isClient() { |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<String> hostNames() { |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object o) { |
| return F.eqNodes(this, o); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| return id().hashCode(); |
| } |
| } |
| } |