IGNITE-16182 Migrate TcpDiscoveryZookeeperIpFinder to ignite-extensions. (#9682)
(cherry picked from commit f8c40271eeb915c4728f0baa6a1e4196c48f5b33)
diff --git a/modules/control-utility/pom.xml b/modules/control-utility/pom.xml
index e0874d9..5a7f787 100644
--- a/modules/control-utility/pom.xml
+++ b/modules/control-utility/pom.xml
@@ -102,6 +102,13 @@
</dependency>
<dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
diff --git a/modules/zookeeper/pom.xml b/modules/zookeeper/pom.xml
index 771fb6e..392ea37 100644
--- a/modules/zookeeper/pom.xml
+++ b/modules/zookeeper/pom.xml
@@ -42,50 +42,6 @@
</dependency>
<dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>${curator.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>${curator.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-x-discovery</artifactId>
- <version>${curator.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-client</artifactId>
- <version>${curator.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava16.version}</version>
- </dependency>
-
- <!-- Do not remove org.codehaus.jackson:jackson-core-asl it is required by Apache Curator at runtime -->
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- <version>${jackson1.version}</version>
- </dependency>
-
- <!-- Do not remove org.codehaus.jackson:jackson-mapper-asl it is required by Apache Curator at runtime -->
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- <version>${jackson1.version}</version>
- </dependency>
-
- <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
@@ -107,11 +63,20 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
deleted file mode 100644
index ad0821b..0000000
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/TcpDiscoveryZookeeperIpFinder.java
+++ /dev/null
@@ -1,398 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp.ipfinder.zk;
-
-import java.net.InetSocketAddress;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.common.collect.Sets;
-import org.apache.curator.RetryPolicy;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.imps.CuratorFrameworkState;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.x.discovery.ServiceDiscovery;
-import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
-import org.apache.curator.x.discovery.ServiceInstance;
-import org.apache.curator.x.discovery.UriSpec;
-import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.SystemProperty;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.resources.LoggerResource;
-import org.apache.ignite.spi.IgniteSpiException;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter;
-import org.codehaus.jackson.map.annotate.JsonRootName;
-
-/**
- * This TCP Discovery IP Finder uses Apache ZooKeeper (ZK) to locate peer nodes when bootstrapping in order to join
- * the cluster. It uses the Apache Curator library to interact with ZooKeeper in a simple manner. Specifically,
- * it uses the {@link ServiceDiscovery} recipe, which makes use of ephemeral nodes in ZK to register services.
- *
- * <p>
- * There are several ways to instantiate the TcpDiscoveryZookeeperIpFinder:
- * <li>
- * <ul>By providing an instance of {@link CuratorFramework} directly, in which case no ZK Connection String
- * is required.</ul>
- * <ul>By providing a ZK Connection String through {@link #setZkConnectionString(String)}, and optionally
- * a {@link RetryPolicy} through the setter. If the latter is not provided, a default
- * {@link ExponentialBackoffRetry} policy is used, with a base sleep time of 1000ms and 10 retries.</ul>
- * <ul>By providing a ZK Connection String through system property {@link #PROP_ZK_CONNECTION_STRING}. If this
- * property is set, it overrides the ZK Connection String passed in as a property, but it does not override
- * the {@link CuratorFramework} if provided.</ul>
- * </li>
- *
- * You may customise the base path for services, as well as the service name. By default {@link #BASE_PATH} and
- * {@link #SERVICE_NAME} are use respectively. You can also choose to enable or disable duplicate registrations. See
- * {@link #setAllowDuplicateRegistrations(boolean)} for more details.
- *
- * @see <a href="http://zookeeper.apache.org">Apache ZooKeeper</a>
- * @see <a href="http://curator.apache.org">Apache Curator</a>
- */
-public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter {
- /** System property name to provide the ZK Connection String. */
- @SystemProperty(value = "Zookeeper connection string", type = String.class)
- public static final String PROP_ZK_CONNECTION_STRING = "IGNITE_ZK_CONNECTION_STRING";
-
- /** Default base path for service registrations. */
- private static final String BASE_PATH = "/services";
-
- /** Default service name for service registrations. */
- private static final String SERVICE_NAME = "ignite";
-
- /** Default URI Spec to use with the {@link ServiceDiscoveryBuilder}. */
- private static final UriSpec URI_SPEC = new UriSpec("{address}:{port}");
-
- /** Init guard. */
- @GridToStringExclude
- private final AtomicBoolean initGuard = new AtomicBoolean();
-
- /** Init guard. */
- @GridToStringExclude
- private final AtomicBoolean closeGuard = new AtomicBoolean();
-
- /** Logger. */
- @LoggerResource
- private IgniteLogger log;
-
- /** The Curator framework in use, either injected or constructed by this component. */
- private CuratorFramework curator;
-
- /** The ZK Connection String if provided by the user. */
- private String zkConnectionString;
-
- /** Retry policy to use. */
- private RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
-
- /** Base path to use, by default {#link #BASE_PATH}. */
- private String basePath = BASE_PATH;
-
- /** Service name to use, by default {#link #SERVICE_NAME}. */
- private String serviceName = SERVICE_NAME;
-
- /** Whether to allow or not duplicate registrations. See setter doc. */
- private boolean allowDuplicateRegistrations;
-
- /** The Service Discovery recipe. */
- private ServiceDiscovery<IgniteInstanceDetails> discovery;
-
- /** Map of the {#link ServiceInstance}s we have registered. */
- private Map<InetSocketAddress, ServiceInstance<IgniteInstanceDetails>> ourInstances = new ConcurrentHashMap<>();
-
- /** Constructor. */
- public TcpDiscoveryZookeeperIpFinder() {
- setShared(true);
- }
-
- /** Initializes this IP Finder by creating the appropriate Curator objects. */
- private void init() {
- if (!initGuard.compareAndSet(false, true))
- return;
-
- String sysPropZkConnString = IgniteSystemProperties.getString(PROP_ZK_CONNECTION_STRING);
-
- if (sysPropZkConnString != null && !sysPropZkConnString.trim().isEmpty())
- zkConnectionString = sysPropZkConnString;
-
- if (log.isInfoEnabled())
- log.info("Initializing ZooKeeper IP Finder.");
-
- if (curator == null) {
- A.notNullOrEmpty(zkConnectionString, String.format("ZooKeeper URL (or system property %s) cannot be null " +
- "or empty if a CuratorFramework object is not provided explicitly", PROP_ZK_CONNECTION_STRING));
- curator = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy);
- }
-
- if (curator.getState() == CuratorFrameworkState.LATENT)
- curator.start();
-
- A.ensure(curator.getState() == CuratorFrameworkState.STARTED, "CuratorFramework can't be started.");
-
- discovery = ServiceDiscoveryBuilder.builder(IgniteInstanceDetails.class)
- .client(curator)
- .basePath(basePath)
- .serializer(new JsonInstanceSerializer<>(IgniteInstanceDetails.class))
- .build();
- }
-
- /** {@inheritDoc} */
- @Override public void onSpiContextDestroyed() {
- if (!closeGuard.compareAndSet(false, true)) {
- U.warn(log, "ZooKeeper IP Finder can't be closed more than once.");
-
- return;
- }
-
- if (log.isInfoEnabled())
- log.info("Destroying ZooKeeper IP Finder.");
-
- super.onSpiContextDestroyed();
-
- if (curator != null)
- curator.close();
-
- }
-
- /** {@inheritDoc} */
- @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException {
- init();
-
- if (log.isDebugEnabled())
- log.debug("Getting registered addresses from ZooKeeper IP Finder.");
-
- Collection<ServiceInstance<IgniteInstanceDetails>> serviceInstances;
-
- try {
- serviceInstances = discovery.queryForInstances(serviceName);
- }
- catch (Exception e) {
- log.warning("Error while getting registered addresses from ZooKeeper IP Finder.", e);
- return Collections.emptyList();
- }
-
- Set<InetSocketAddress> answer = new HashSet<>();
-
- for (ServiceInstance<IgniteInstanceDetails> si : serviceInstances)
- answer.add(new InetSocketAddress(si.getAddress(), si.getPort()));
-
- if (log.isInfoEnabled())
- log.info("ZooKeeper IP Finder resolved addresses: " + answer);
-
- return answer;
- }
-
- /** {@inheritDoc} */
- @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
- init();
-
- if (log.isInfoEnabled())
- log.info("Registering addresses with ZooKeeper IP Finder: " + addrs);
-
- Set<InetSocketAddress> registrationsToIgnore = Sets.newHashSet();
- if (!allowDuplicateRegistrations) {
- try {
- for (ServiceInstance<IgniteInstanceDetails> sd : discovery.queryForInstances(serviceName))
- registrationsToIgnore.add(new InetSocketAddress(sd.getAddress(), sd.getPort()));
- }
- catch (Exception e) {
- log.warning("Error while finding currently registered services to avoid duplicate registrations", e);
- throw new IgniteSpiException(e);
- }
- }
-
- for (InetSocketAddress addr : addrs) {
- if (registrationsToIgnore.contains(addr))
- continue;
-
- try {
- ServiceInstance<IgniteInstanceDetails> si = ServiceInstance.<IgniteInstanceDetails>builder()
- .name(serviceName)
- .uriSpec(URI_SPEC)
- .address(addr.getAddress().getHostAddress())
- .port(addr.getPort())
- .build();
-
- ourInstances.put(addr, si);
-
- discovery.registerService(si);
-
- }
- catch (Exception e) {
- log.warning(String.format("Error while registering an address from ZooKeeper IP Finder " +
- "[message=%s,addresses=%s]", e.getMessage(), addr), e);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException {
-
- // if curator is not STARTED, we have nothing to unregister, because we are using ephemeral nodes,
- // which means that our addresses will only be registered in ZK as long as our connection is alive
- if (curator.getState() != CuratorFrameworkState.STARTED)
- return;
-
- if (log.isInfoEnabled())
- log.info("Unregistering addresses with ZooKeeper IP Finder: " + addrs);
-
- for (InetSocketAddress addr : addrs) {
- ServiceInstance<IgniteInstanceDetails> si = ourInstances.get(addr);
- if (si == null) {
- log.warning("Asked to unregister address from ZooKeeper IP Finder, but no match was found in local " +
- "instance map for: " + addrs);
- continue;
- }
-
- try {
- discovery.unregisterService(si);
- }
- catch (Exception e) {
- log.warning("Error while unregistering an address from ZooKeeper IP Finder: " + addr, e);
- }
- }
- }
-
- /**
- * @param curator A {@link CuratorFramework} instance to use. It can already be in <tt>STARTED</tt> state.
- * @return {@code this} for chaining.
- */
- public TcpDiscoveryZookeeperIpFinder setCurator(CuratorFramework curator) {
- this.curator = curator;
-
- return this;
- }
-
- /**
- * @return The ZooKeeper connection string, only if set explicitly. Else, it returns null.
- */
- public String getZkConnectionString() {
- return zkConnectionString;
- }
-
- /**
- * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework} is not being set
- * explicitly.
- * @return {@code this} for chaining.
- */
- public TcpDiscoveryZookeeperIpFinder setZkConnectionString(String zkConnectionString) {
- this.zkConnectionString = zkConnectionString;
-
- return this;
- }
-
- /**
- * @return Retry policy in use if, and only if, it was set explicitly. Else, it returns null.
- */
- public RetryPolicy getRetryPolicy() {
- return retryPolicy;
- }
-
- /**
- * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being injected, or if using a
- * system property.
- * @return {@code this} for chaining.
- */
- public TcpDiscoveryZookeeperIpFinder setRetryPolicy(RetryPolicy retryPolicy) {
- this.retryPolicy = retryPolicy;
-
- return this;
- }
-
- /**
- * @return Base path for service registration in ZK. Default value: {@link #BASE_PATH}.
- */
- public String getBasePath() {
- return basePath;
- }
-
- /**
- * @param basePath Base path for service registration in ZK. If not passed, {@link #BASE_PATH} will be used.
- * @return {@code this} for chaining.
- */
- public TcpDiscoveryZookeeperIpFinder setBasePath(String basePath) {
- this.basePath = basePath;
-
- return this;
- }
-
- /**
- * @return Service name being used, in Curator terms. See {@link #setServiceName(String)} for more information.
- */
- public String getServiceName() {
- return serviceName;
- }
-
- /**
- * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery} recipe. In physical ZK
- * terms, it represents the node under {@link #basePath}, under which services will be registered.
- * @return {@code this} for chaining.
- */
- public TcpDiscoveryZookeeperIpFinder setServiceName(String serviceName) {
- this.serviceName = serviceName;
-
- return this;
- }
-
- /**
- * @return The value of this flag. See {@link #setAllowDuplicateRegistrations(boolean)} for more details.
- */
- public boolean isAllowDuplicateRegistrations() {
- return allowDuplicateRegistrations;
- }
-
- /**
- * @param allowDuplicateRegistrations Whether to register each node only once, or if duplicate registrations are
- * allowed. Nodes will attempt to register themselves, plus those they know about. By default, duplicate
- * registrations are not allowed, but you might want to set this property to <tt>true</tt> if you have multiple
- * network interfaces or if you are facing troubles.
- * @return {@code this} for chaining.
- */
- public TcpDiscoveryZookeeperIpFinder setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) {
- this.allowDuplicateRegistrations = allowDuplicateRegistrations;
-
- return this;
- }
-
- /** {@inheritDoc} */
- @Override public TcpDiscoveryZookeeperIpFinder setShared(boolean shared) {
- super.setShared(shared);
-
- return this;
- }
-
- /**
- * Empty DTO for storing service instances details. Currently acting as a placeholder because Curator requires a
- * payload type when registering and discovering nodes. May be enhanced in the future with further information to
- * assist discovery.
- *
- * @author Raul Kripalani
- */
- @JsonRootName("ignite_instance_details")
- private class IgniteInstanceDetails {
-
- }
-
-}
diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/package-info.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/package-info.java
deleted file mode 100644
index fcfc7b9..0000000
--- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains TCP Discovery IP Finder uses Apache ZooKeeper (ZK) to locate peer nodes.
- */
-
-package org.apache.ignite.spi.discovery.tcp.ipfinder.zk;
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
deleted file mode 100644
index 517b8b7..0000000
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
+++ /dev/null
@@ -1,409 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp.ipfinder.zk;
-
-import java.util.Collection;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.curator.test.InstanceSpec;
-import org.apache.curator.test.TestingCluster;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.events.Event;
-import org.apache.ignite.events.EventType;
-import org.apache.ignite.lang.IgniteBiPredicate;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.testframework.junits.WithSystemProperty;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-
-/**
- * Test for {@link TcpDiscoveryZookeeperIpFinder}.
- *
- * @author Raul Kripalani
- */
-@WithSystemProperty(key = "zookeeper.jmx.log4j.disable", value = "true") // disable JMX for tests
-public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
- /** Per test timeout */
- @Rule
- public Timeout globalTimeout = new Timeout((int) GridTestUtils.DFLT_TEST_TIMEOUT);
-
- /** ZK Cluster size. */
- private static final int ZK_CLUSTER_SIZE = 3;
-
- /** ZK Path size. */
- private static final String SERVICES_IGNITE_ZK_PATH = "/services/ignite";
-
- /** The ZK cluster instance, from curator-test. */
- private TestingCluster zkCluster;
-
- /** A Curator client to perform assertions on the embedded ZK instances. */
- private CuratorFramework zkCurator;
-
- /** Whether to allow duplicate registrations for the current test method or not. */
- private boolean allowDuplicateRegistrations = false;
-
- /** Constructor that does not start any grids. */
- public ZookeeperIpFinderTest() {
- super(false);
- }
-
- /**
- * Before test.
- *
- * @throws Exception
- */
- @Override public void beforeTest() throws Exception {
- super.beforeTest();
-
- // remove stale system properties
- System.getProperties().remove(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING);
-
- // start the ZK cluster
- zkCluster = new TestingCluster(ZK_CLUSTER_SIZE);
-
- zkCluster.start();
-
- // start the Curator client so we can perform assertions on the ZK state later
- zkCurator = CuratorFrameworkFactory.newClient(zkCluster.getConnectString(), new RetryNTimes(10, 1000));
- zkCurator.start();
- }
-
- /**
- * After test.
- *
- * @throws Exception
- */
- @Override public void afterTest() throws Exception {
- super.afterTest();
-
- if (zkCurator != null)
- CloseableUtils.closeQuietly(zkCurator);
-
- if (zkCluster != null)
- CloseableUtils.closeQuietly(zkCluster);
-
- stopAllGrids();
- }
-
- /**
- * Enhances the default configuration with the {#TcpDiscoveryZookeeperIpFinder}.
- *
- * @param igniteInstanceName Ignite instance name.
- * @return Ignite configuration.
- * @throws Exception If failed.
- */
- @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
- IgniteConfiguration configuration = super.getConfiguration(igniteInstanceName);
-
- TcpDiscoverySpi tcpDisco = (TcpDiscoverySpi)configuration.getDiscoverySpi();
- TcpDiscoveryZookeeperIpFinder zkIpFinder = new TcpDiscoveryZookeeperIpFinder();
- zkIpFinder.setAllowDuplicateRegistrations(allowDuplicateRegistrations);
-
- // first node => configure with zkUrl; second node => configure with CuratorFramework; third and subsequent
- // shall be configured through system property
- if (igniteInstanceName.equals(getTestIgniteInstanceName(0)))
- zkIpFinder.setZkConnectionString(zkCluster.getConnectString());
-
- else if (igniteInstanceName.equals(getTestIgniteInstanceName(1))) {
- zkIpFinder.setCurator(CuratorFrameworkFactory.newClient(zkCluster.getConnectString(),
- new ExponentialBackoffRetry(100, 5)));
- }
-
- tcpDisco.setIpFinder(zkIpFinder);
-
- return configuration;
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testOneIgniteNodeIsAlone() throws Exception {
- startGrid(0);
-
- assertEquals(1, grid(0).cluster().metrics().getTotalNodes());
-
- stopAllGrids();
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testTwoIgniteNodesFindEachOther() throws Exception {
- // start one node
- startGrid(0);
-
- // set up an event listener to expect one NODE_JOINED event
- CountDownLatch latch = expectJoinEvents(grid(0), 1);
-
- // start the other node
- startGrid(1);
-
- // assert the nodes see each other
- assertEquals(2, grid(0).cluster().metrics().getTotalNodes());
- assertEquals(2, grid(1).cluster().metrics().getTotalNodes());
-
- // assert the event listener got as many events as expected
- latch.await(1, TimeUnit.SECONDS);
-
- stopAllGrids();
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testThreeNodesWithThreeDifferentConfigMethods() throws Exception {
- // start one node
- startGrid(0);
-
- // set up an event listener to expect one NODE_JOINED event
- CountDownLatch latch = expectJoinEvents(grid(0), 2);
-
- // start the 2nd node
- startGrid(1);
-
- // start the 3rd node, first setting the system property
- System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
- startGrid(2);
-
- // wait until all grids are started
- waitForRemoteNodes(grid(0), 2);
-
- // assert the nodes see each other
- assertEquals(3, grid(0).cluster().metrics().getTotalNodes());
- assertEquals(3, grid(1).cluster().metrics().getTotalNodes());
- assertEquals(3, grid(2).cluster().metrics().getTotalNodes());
-
- // assert the event listener got as many events as expected
- latch.await(1, TimeUnit.SECONDS);
-
- stopAllGrids();
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testFourNodesStartingAndStopping() throws Exception {
- // start one node
- startGrid(0);
-
- // set up an event listener to expect one NODE_JOINED event
- CountDownLatch latch = expectJoinEvents(grid(0), 3);
-
- // start the 2nd node
- startGrid(1);
-
- // start the 3rd & 4th nodes, first setting the system property
- System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
- startGrid(2);
- startGrid(3);
-
- // wait until all grids are started
- waitForRemoteNodes(grid(0), 3);
-
- // assert the nodes see each other
- assertEquals(4, grid(0).cluster().metrics().getTotalNodes());
- assertEquals(4, grid(1).cluster().metrics().getTotalNodes());
- assertEquals(4, grid(2).cluster().metrics().getTotalNodes());
- assertEquals(4, grid(3).cluster().metrics().getTotalNodes());
-
- // assert the event listener got as many events as expected
- latch.await(1, TimeUnit.SECONDS);
-
- // stop the first grid
- stopGrid(0);
-
- // make sure that nodes were synchronized; they should only see 3 now
- assertEquals(3, grid(1).cluster().metrics().getTotalNodes());
- assertEquals(3, grid(2).cluster().metrics().getTotalNodes());
- assertEquals(3, grid(3).cluster().metrics().getTotalNodes());
-
- // stop all remaining grids
- stopGrid(1);
- stopGrid(2);
- stopGrid(3);
-
- // check that the nodes are gone in ZK
- assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testFourNodesWithDuplicateRegistrations() throws Exception {
- allowDuplicateRegistrations = true;
-
- // start 4 nodes
- System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
- startGrids(4);
-
- // wait until all grids are started
- waitForRemoteNodes(grid(0), 3);
-
- // each node will register itself + the node that it connected to to join the cluster
- assertEquals(7, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
-
- // stop all grids
- stopAllGrids();
-
- // check that all nodes are gone in ZK
- assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testFourNodesWithNoDuplicateRegistrations() throws Exception {
- allowDuplicateRegistrations = false;
-
- // start 4 nodes
- System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
- startGrids(4);
-
- // wait until all grids are started
- waitForRemoteNodes(grid(0), 3);
-
- // each node will only register itself
- assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
-
- // stop all grids
- stopAllGrids();
-
- // check that all nodes are gone in ZK
- assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testFourNodesRestartLastSeveralTimes() throws Exception {
- allowDuplicateRegistrations = false;
-
- // start 4 nodes
- System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
- startGrids(4);
-
- // wait until all grids are started
- waitForRemoteNodes(grid(0), 3);
-
- // each node will only register itself
- assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
-
- // repeat 5 times
- for (int i = 0; i < 5; i++) {
- // stop last grid
- stopGrid(2);
-
- // check that the node has unregistered itself and its party
- assertEquals(3, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
-
- // start the node again
- startGrid(2);
-
- // check that the node back in ZK
- assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
- }
-
- stopAllGrids();
-
- assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
- }
-
- /**
- * @throws Exception If failed.
- */
- @Test
- public void testFourNodesKillRestartZookeeper() throws Exception {
- allowDuplicateRegistrations = false;
-
- // start 4 nodes
- System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
- startGrids(4);
-
- // wait until all grids are started
- waitForRemoteNodes(grid(0), 3);
-
- // each node will only register itself
- assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
-
- // remember ZK server configuration and stop the cluster
- Collection<InstanceSpec> instances = zkCluster.getInstances();
- zkCluster.stop();
- Thread.sleep(1000);
-
- // start the cluster with the previous configuration
- zkCluster = new TestingCluster(instances);
- zkCluster.start();
-
- // block the client until connected
- zkCurator.blockUntilConnected();
-
- // Check that the nodes have registered again with the previous configuration.
- assertEquals(4, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
-
- // Block the clients until connected.
- for (int i = 0; i < 4; i++) {
- TcpDiscoverySpi spi = (TcpDiscoverySpi)grid(i).configuration().getDiscoverySpi();
-
- TcpDiscoveryZookeeperIpFinder zkIpFinder = (TcpDiscoveryZookeeperIpFinder)spi.getIpFinder();
-
- CuratorFramework curator = GridTestUtils.getFieldValue(zkIpFinder, "curator");
-
- curator.blockUntilConnected();
- }
-
- // stop all grids
- stopAllGrids();
-
- assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
- }
-
- /**
- * @param ignite Node.
- * @param joinEvtCnt Expected events number.
- * @return Events latch.
- */
- private CountDownLatch expectJoinEvents(Ignite ignite, int joinEvtCnt) {
- final CountDownLatch latch = new CountDownLatch(joinEvtCnt);
-
- ignite.events().remoteListen(new IgniteBiPredicate<UUID, Event>() {
- @Override public boolean apply(UUID uuid, Event evt) {
- latch.countDown();
- return true;
- }
- }, null, EventType.EVT_NODE_JOINED);
-
- return latch;
- }
-}
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTestSuite.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTestSuite.java
deleted file mode 100644
index a5b6d8f..0000000
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTestSuite.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp.ipfinder.zk;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
-
-/**
- * Zookeeper IP Finder tests.
- */
-@RunWith(Suite.class)
-@Suite.SuiteClasses({
- ZookeeperIpFinderTest.class
-})
-public class ZookeeperIpFinderTestSuite {
-
-}