| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.spi.discovery.tcp.ipfinder.zk; |
| |
| import java.net.InetSocketAddress; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import com.google.common.collect.Sets; |
| import org.apache.curator.RetryPolicy; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.apache.curator.framework.CuratorFrameworkFactory; |
| import org.apache.curator.framework.imps.CuratorFrameworkState; |
| import org.apache.curator.retry.ExponentialBackoffRetry; |
| import org.apache.curator.x.discovery.ServiceDiscovery; |
| import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; |
| import org.apache.curator.x.discovery.ServiceInstance; |
| import org.apache.curator.x.discovery.UriSpec; |
| import org.apache.curator.x.discovery.details.JsonInstanceSerializer; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.SystemProperty; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.internal.A; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.resources.LoggerResource; |
| import org.apache.ignite.spi.IgniteSpiException; |
| import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinderAdapter; |
| import org.codehaus.jackson.map.annotate.JsonRootName; |
| |
| /** |
| * This TCP Discovery IP Finder uses Apache ZooKeeper (ZK) to locate peer nodes when bootstrapping in order to join |
| * the cluster. It uses the Apache Curator library to interact with ZooKeeper in a simple manner. Specifically, |
| * it uses the {@link ServiceDiscovery} recipe, which makes use of ephemeral nodes in ZK to register services. |
| * |
| * <p> |
| * There are several ways to instantiate the TcpDiscoveryZookeeperIpFinder: |
| * <li> |
| * <ul>By providing an instance of {@link CuratorFramework} directly, in which case no ZK Connection String |
| * is required.</ul> |
| * <ul>By providing a ZK Connection String through {@link #setZkConnectionString(String)}, and optionally |
| * a {@link RetryPolicy} through the setter. If the latter is not provided, a default |
| * {@link ExponentialBackoffRetry} policy is used, with a base sleep time of 1000ms and 10 retries.</ul> |
| * <ul>By providing a ZK Connection String through system property {@link #PROP_ZK_CONNECTION_STRING}. If this |
| * property is set, it overrides the ZK Connection String passed in as a property, but it does not override |
| * the {@link CuratorFramework} if provided.</ul> |
| * </li> |
| * |
| * You may customise the base path for services, as well as the service name. By default {@link #BASE_PATH} and |
| * {@link #SERVICE_NAME} are use respectively. You can also choose to enable or disable duplicate registrations. See |
| * {@link #setAllowDuplicateRegistrations(boolean)} for more details. |
| * |
| * @see <a href="http://zookeeper.apache.org">Apache ZooKeeper</a> |
| * @see <a href="http://curator.apache.org">Apache Curator</a> |
| */ |
| public class TcpDiscoveryZookeeperIpFinder extends TcpDiscoveryIpFinderAdapter { |
| /** System property name to provide the ZK Connection String. */ |
| @SystemProperty(value = "Zookeeper connection string", type = String.class) |
| public static final String PROP_ZK_CONNECTION_STRING = "IGNITE_ZK_CONNECTION_STRING"; |
| |
| /** Default base path for service registrations. */ |
| private static final String BASE_PATH = "/services"; |
| |
| /** Default service name for service registrations. */ |
| private static final String SERVICE_NAME = "ignite"; |
| |
| /** Default URI Spec to use with the {@link ServiceDiscoveryBuilder}. */ |
| private static final UriSpec URI_SPEC = new UriSpec("{address}:{port}"); |
| |
| /** Init guard. */ |
| @GridToStringExclude |
| private final AtomicBoolean initGuard = new AtomicBoolean(); |
| |
| /** Init guard. */ |
| @GridToStringExclude |
| private final AtomicBoolean closeGuard = new AtomicBoolean(); |
| |
| /** Logger. */ |
| @LoggerResource |
| private IgniteLogger log; |
| |
| /** The Curator framework in use, either injected or constructed by this component. */ |
| private CuratorFramework curator; |
| |
| /** The ZK Connection String if provided by the user. */ |
| private String zkConnectionString; |
| |
| /** Retry policy to use. */ |
| private RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); |
| |
| /** Base path to use, by default {#link #BASE_PATH}. */ |
| private String basePath = BASE_PATH; |
| |
| /** Service name to use, by default {#link #SERVICE_NAME}. */ |
| private String serviceName = SERVICE_NAME; |
| |
| /** Whether to allow or not duplicate registrations. See setter doc. */ |
| private boolean allowDuplicateRegistrations; |
| |
| /** The Service Discovery recipe. */ |
| private ServiceDiscovery<IgniteInstanceDetails> discovery; |
| |
| /** Map of the {#link ServiceInstance}s we have registered. */ |
| private Map<InetSocketAddress, ServiceInstance<IgniteInstanceDetails>> ourInstances = new ConcurrentHashMap<>(); |
| |
| /** Constructor. */ |
| public TcpDiscoveryZookeeperIpFinder() { |
| setShared(true); |
| } |
| |
| /** Initializes this IP Finder by creating the appropriate Curator objects. */ |
| private void init() { |
| if (!initGuard.compareAndSet(false, true)) |
| return; |
| |
| String sysPropZkConnString = IgniteSystemProperties.getString(PROP_ZK_CONNECTION_STRING); |
| |
| if (sysPropZkConnString != null && !sysPropZkConnString.trim().isEmpty()) |
| zkConnectionString = sysPropZkConnString; |
| |
| if (log.isInfoEnabled()) |
| log.info("Initializing ZooKeeper IP Finder."); |
| |
| if (curator == null) { |
| A.notNullOrEmpty(zkConnectionString, String.format("ZooKeeper URL (or system property %s) cannot be null " + |
| "or empty if a CuratorFramework object is not provided explicitly", PROP_ZK_CONNECTION_STRING)); |
| curator = CuratorFrameworkFactory.newClient(zkConnectionString, retryPolicy); |
| } |
| |
| if (curator.getState() == CuratorFrameworkState.LATENT) |
| curator.start(); |
| |
| A.ensure(curator.getState() == CuratorFrameworkState.STARTED, "CuratorFramework can't be started."); |
| |
| discovery = ServiceDiscoveryBuilder.builder(IgniteInstanceDetails.class) |
| .client(curator) |
| .basePath(basePath) |
| .serializer(new JsonInstanceSerializer<>(IgniteInstanceDetails.class)) |
| .build(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onSpiContextDestroyed() { |
| if (!closeGuard.compareAndSet(false, true)) { |
| U.warn(log, "ZooKeeper IP Finder can't be closed more than once."); |
| |
| return; |
| } |
| |
| if (log.isInfoEnabled()) |
| log.info("Destroying ZooKeeper IP Finder."); |
| |
| super.onSpiContextDestroyed(); |
| |
| if (curator != null) |
| curator.close(); |
| |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException { |
| init(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Getting registered addresses from ZooKeeper IP Finder."); |
| |
| Collection<ServiceInstance<IgniteInstanceDetails>> serviceInstances; |
| |
| try { |
| serviceInstances = discovery.queryForInstances(serviceName); |
| } |
| catch (Exception e) { |
| log.warning("Error while getting registered addresses from ZooKeeper IP Finder.", e); |
| return Collections.emptyList(); |
| } |
| |
| Set<InetSocketAddress> answer = new HashSet<>(); |
| |
| for (ServiceInstance<IgniteInstanceDetails> si : serviceInstances) |
| answer.add(new InetSocketAddress(si.getAddress(), si.getPort())); |
| |
| if (log.isInfoEnabled()) |
| log.info("ZooKeeper IP Finder resolved addresses: " + answer); |
| |
| return answer; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { |
| init(); |
| |
| if (log.isInfoEnabled()) |
| log.info("Registering addresses with ZooKeeper IP Finder: " + addrs); |
| |
| Set<InetSocketAddress> registrationsToIgnore = Sets.newHashSet(); |
| if (!allowDuplicateRegistrations) { |
| try { |
| for (ServiceInstance<IgniteInstanceDetails> sd : discovery.queryForInstances(serviceName)) |
| registrationsToIgnore.add(new InetSocketAddress(sd.getAddress(), sd.getPort())); |
| } |
| catch (Exception e) { |
| log.warning("Error while finding currently registered services to avoid duplicate registrations", e); |
| throw new IgniteSpiException(e); |
| } |
| } |
| |
| for (InetSocketAddress addr : addrs) { |
| if (registrationsToIgnore.contains(addr)) |
| continue; |
| |
| try { |
| ServiceInstance<IgniteInstanceDetails> si = ServiceInstance.<IgniteInstanceDetails>builder() |
| .name(serviceName) |
| .uriSpec(URI_SPEC) |
| .address(addr.getAddress().getHostAddress()) |
| .port(addr.getPort()) |
| .build(); |
| |
| ourInstances.put(addr, si); |
| |
| discovery.registerService(si); |
| |
| } |
| catch (Exception e) { |
| log.warning(String.format("Error while registering an address from ZooKeeper IP Finder " + |
| "[message=%s,addresses=%s]", e.getMessage(), addr), e); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { |
| |
| // if curator is not STARTED, we have nothing to unregister, because we are using ephemeral nodes, |
| // which means that our addresses will only be registered in ZK as long as our connection is alive |
| if (curator.getState() != CuratorFrameworkState.STARTED) |
| return; |
| |
| if (log.isInfoEnabled()) |
| log.info("Unregistering addresses with ZooKeeper IP Finder: " + addrs); |
| |
| for (InetSocketAddress addr : addrs) { |
| ServiceInstance<IgniteInstanceDetails> si = ourInstances.get(addr); |
| if (si == null) { |
| log.warning("Asked to unregister address from ZooKeeper IP Finder, but no match was found in local " + |
| "instance map for: " + addrs); |
| continue; |
| } |
| |
| try { |
| discovery.unregisterService(si); |
| } |
| catch (Exception e) { |
| log.warning("Error while unregistering an address from ZooKeeper IP Finder: " + addr, e); |
| } |
| } |
| } |
| |
| /** |
| * @param curator A {@link CuratorFramework} instance to use. It can already be in <tt>STARTED</tt> state. |
| * @return {@code this} for chaining. |
| */ |
| public TcpDiscoveryZookeeperIpFinder setCurator(CuratorFramework curator) { |
| this.curator = curator; |
| |
| return this; |
| } |
| |
| /** |
| * @return The ZooKeeper connection string, only if set explicitly. Else, it returns null. |
| */ |
| public String getZkConnectionString() { |
| return zkConnectionString; |
| } |
| |
| /** |
| * @param zkConnectionString ZooKeeper connection string in case a {@link CuratorFramework} is not being set |
| * explicitly. |
| * @return {@code this} for chaining. |
| */ |
| public TcpDiscoveryZookeeperIpFinder setZkConnectionString(String zkConnectionString) { |
| this.zkConnectionString = zkConnectionString; |
| |
| return this; |
| } |
| |
| /** |
| * @return Retry policy in use if, and only if, it was set explicitly. Else, it returns null. |
| */ |
| public RetryPolicy getRetryPolicy() { |
| return retryPolicy; |
| } |
| |
| /** |
| * @param retryPolicy {@link RetryPolicy} to use in case a ZK Connection String is being injected, or if using a |
| * system property. |
| * @return {@code this} for chaining. |
| */ |
| public TcpDiscoveryZookeeperIpFinder setRetryPolicy(RetryPolicy retryPolicy) { |
| this.retryPolicy = retryPolicy; |
| |
| return this; |
| } |
| |
| /** |
| * @return Base path for service registration in ZK. Default value: {@link #BASE_PATH}. |
| */ |
| public String getBasePath() { |
| return basePath; |
| } |
| |
| /** |
| * @param basePath Base path for service registration in ZK. If not passed, {@link #BASE_PATH} will be used. |
| * @return {@code this} for chaining. |
| */ |
| public TcpDiscoveryZookeeperIpFinder setBasePath(String basePath) { |
| this.basePath = basePath; |
| |
| return this; |
| } |
| |
| /** |
| * @return Service name being used, in Curator terms. See {@link #setServiceName(String)} for more information. |
| */ |
| public String getServiceName() { |
| return serviceName; |
| } |
| |
| /** |
| * @param serviceName Service name to use, as defined by Curator's {#link ServiceDiscovery} recipe. In physical ZK |
| * terms, it represents the node under {@link #basePath}, under which services will be registered. |
| * @return {@code this} for chaining. |
| */ |
| public TcpDiscoveryZookeeperIpFinder setServiceName(String serviceName) { |
| this.serviceName = serviceName; |
| |
| return this; |
| } |
| |
| /** |
| * @return The value of this flag. See {@link #setAllowDuplicateRegistrations(boolean)} for more details. |
| */ |
| public boolean isAllowDuplicateRegistrations() { |
| return allowDuplicateRegistrations; |
| } |
| |
| /** |
| * @param allowDuplicateRegistrations Whether to register each node only once, or if duplicate registrations are |
| * allowed. Nodes will attempt to register themselves, plus those they know about. By default, duplicate |
| * registrations are not allowed, but you might want to set this property to <tt>true</tt> if you have multiple |
| * network interfaces or if you are facing troubles. |
| * @return {@code this} for chaining. |
| */ |
| public TcpDiscoveryZookeeperIpFinder setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) { |
| this.allowDuplicateRegistrations = allowDuplicateRegistrations; |
| |
| return this; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public TcpDiscoveryZookeeperIpFinder setShared(boolean shared) { |
| super.setShared(shared); |
| |
| return this; |
| } |
| |
| /** |
| * Empty DTO for storing service instances details. Currently acting as a placeholder because Curator requires a |
| * payload type when registering and discovering nodes. May be enhanced in the future with further information to |
| * assist discovery. |
| * |
| * @author Raul Kripalani |
| */ |
| @JsonRootName("ignite_instance_details") |
| private class IgniteInstanceDetails { |
| |
| } |
| |
| } |