| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.twill.discovery; |
| |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.CacheLoader; |
| import com.google.common.cache.LoadingCache; |
| import com.google.common.collect.HashMultimap; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Multimap; |
| import com.google.common.hash.Hashing; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.SettableFuture; |
| import org.apache.twill.common.Cancellable; |
| import org.apache.twill.common.Threads; |
| import org.apache.twill.internal.Constants; |
| import org.apache.twill.zookeeper.NodeChildren; |
| import org.apache.twill.zookeeper.NodeData; |
| import org.apache.twill.zookeeper.OperationFuture; |
| import org.apache.twill.zookeeper.ZKClient; |
| import org.apache.twill.zookeeper.ZKClients; |
| import org.apache.twill.zookeeper.ZKOperations; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.net.InetSocketAddress; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| /** |
| * Zookeeper implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}. |
| * <p> |
| * Discoverable services are registered within Zookeeper under the namespace 'discoverable' by default. |
| * If you would like to change the namespace under which the services are registered then you can pass |
| * in the namespace during construction of {@link ZKDiscoveryService}. |
| * </p> |
| * |
| * <p> |
| * Following is a simple example of how {@link ZKDiscoveryService} can be used for registering services |
| * and also for discovering the registered services. |
| * </p> |
| * |
| * <blockquote> |
| * <pre> |
| * {@code |
| * |
| * DiscoveryService service = new ZKDiscoveryService(zkClient); |
| * service.register(new Discoverable() { |
| * @Override |
| * public String getName() { |
| * return 'service-name'; |
| * } |
| * |
| * @Override |
| * public InetSocketAddress getSocketAddress() { |
| * return new InetSocketAddress(hostname, port); |
| * } |
| * }); |
| * ... |
| * ... |
| * ServiceDiscovered services = service.discovery("service-name"); |
| * ... |
| * } |
| * </pre> |
| * </blockquote> |
| */ |
| public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient { |
| private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class); |
| |
| private static final long RETRY_MILLIS = 1000; |
| |
| // In memory map for recreating ephemeral nodes after session expires. |
| // It map from discoverable to the corresponding Cancellable |
| private final Multimap<Discoverable, DiscoveryCancellable> discoverables; |
| private final Lock lock; |
| |
| private final LoadingCache<String, ServiceDiscovered> services; |
| private final ZKClient zkClient; |
| private final ScheduledExecutorService retryExecutor; |
| |
| /** |
| * Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry. |
| * @param zkClient The {@link ZKClient} for interacting with zookeeper. |
| */ |
| public ZKDiscoveryService(ZKClient zkClient) { |
| this(zkClient, Constants.DISCOVERY_PATH_PREFIX); |
| } |
| |
| /** |
| * Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry under namespace. |
| * @param zkClient of zookeeper quorum |
| * @param namespace under which the service registered would be stored in zookeeper. |
| * If namespace is {@code null}, no namespace will be used. |
| */ |
| public ZKDiscoveryService(ZKClient zkClient, String namespace) { |
| this.discoverables = HashMultimap.create(); |
| this.lock = new ReentrantLock(); |
| this.retryExecutor = Executors.newSingleThreadScheduledExecutor( |
| Threads.createDaemonThreadFactory("zk-discovery-retry")); |
| this.zkClient = namespace == null ? zkClient : ZKClients.namespace(zkClient, namespace); |
| this.services = CacheBuilder.newBuilder().build(createServiceLoader()); |
| this.zkClient.addConnectionWatcher(createConnectionWatcher()); |
| } |
| |
| /** |
| * Registers a {@link Discoverable} in zookeeper. |
| * <p> |
| * Registering a {@link Discoverable} will create a node <base>/<service-name> |
| * in zookeeper as a ephemeral node. If the node already exists (timeout associated with emphemeral node creation), |
| * then a runtime exception is thrown to make sure that a service with an intent to register is not started without |
| * registering. |
| * When a runtime exception is thrown, expectation is that the process being started will fail and would be started |
| * again by the monitoring service. |
| * </p> |
| * @param discoverable Information of the service provider that could be discovered. |
| * @return An instance of {@link Cancellable} |
| */ |
| @Override |
| public Cancellable register(final Discoverable discoverable) { |
| final Discoverable wrapper = new DiscoverableWrapper(discoverable); |
| final SettableFuture<String> future = SettableFuture.create(); |
| final DiscoveryCancellable cancellable = new DiscoveryCancellable(wrapper); |
| |
| // Create the zk ephemeral node. |
| Futures.addCallback(doRegister(wrapper), new FutureCallback<String>() { |
| @Override |
| public void onSuccess(String result) { |
| // Set the sequence node path to cancellable for future cancellation. |
| cancellable.setPath(result); |
| lock.lock(); |
| try { |
| discoverables.put(wrapper, cancellable); |
| } finally { |
| lock.unlock(); |
| } |
| LOG.debug("Service registered: {} {}", wrapper, result); |
| future.set(result); |
| } |
| |
| @Override |
| public void onFailure(Throwable t) { |
| if (t instanceof KeeperException.NodeExistsException) { |
| handleRegisterFailure(discoverable, future, this, t); |
| } else { |
| LOG.warn("Failed to register: {}", wrapper, t); |
| future.setException(t); |
| } |
| } |
| }, Threads.SAME_THREAD_EXECUTOR); |
| |
| Futures.getUnchecked(future); |
| return cancellable; |
| } |
| |
| @Override |
| public ServiceDiscovered discover(String service) { |
| return services.getUnchecked(service); |
| } |
| |
| /** |
| * Handle registration failure. |
| * |
| * @param discoverable The discoverable to register. |
| * @param completion A settable future to set when registration is completed / failed. |
| * @param creationCallback A future callback for path creation. |
| * @param failureCause The original cause of failure. |
| */ |
| private void handleRegisterFailure(final Discoverable discoverable, |
| final SettableFuture<String> completion, |
| final FutureCallback<String> creationCallback, |
| final Throwable failureCause) { |
| |
| final String path = getNodePath(discoverable); |
| Futures.addCallback(zkClient.exists(path), new FutureCallback<Stat>() { |
| @Override |
| public void onSuccess(Stat result) { |
| if (result == null) { |
| // If the node is gone, simply retry. |
| LOG.info("Node {} is gone. Retry registration for {}.", path, discoverable); |
| retryRegister(discoverable, creationCallback); |
| return; |
| } |
| |
| long ephemeralOwner = result.getEphemeralOwner(); |
| if (ephemeralOwner == 0) { |
| // it is not an ephemeral node, something wrong. |
| LOG.error("Node {} already exists and is not an ephemeral node. Discoverable registration failed: {}.", |
| path, discoverable); |
| completion.setException(failureCause); |
| return; |
| } |
| Long sessionId = zkClient.getSessionId(); |
| if (sessionId == null || ephemeralOwner != sessionId) { |
| // This zkClient is not valid or doesn't own the ephemeral node, simply keep retrying. |
| LOG.info("Owner of {} is different. Retry registration for {}.", path, discoverable); |
| retryRegister(discoverable, creationCallback); |
| } else { |
| // This client owned the node, treat the registration as completed. |
| // This could happen if same client tries to register twice (due to mistake or failure race condition). |
| completion.set(path); |
| } |
| } |
| |
| @Override |
| public void onFailure(Throwable t) { |
| // If exists call failed, simply retry creation. |
| LOG.warn("Error when getting stats on {}. Retry registration for {}.", path, discoverable); |
| retryRegister(discoverable, creationCallback); |
| } |
| }, Threads.SAME_THREAD_EXECUTOR); |
| } |
| |
| private OperationFuture<String> doRegister(Discoverable discoverable) { |
| byte[] discoverableBytes = DiscoverableAdapter.encode(discoverable); |
| return zkClient.create(getNodePath(discoverable), discoverableBytes, CreateMode.EPHEMERAL, true); |
| } |
| |
| private void retryRegister(final Discoverable discoverable, final FutureCallback<String> creationCallback) { |
| retryExecutor.schedule(new Runnable() { |
| |
| @Override |
| public void run() { |
| Futures.addCallback(doRegister(discoverable), creationCallback, Threads.SAME_THREAD_EXECUTOR); |
| } |
| }, RETRY_MILLIS, TimeUnit.MILLISECONDS); |
| } |
| |
| |
| /** |
| * Generate unique node path for a given {@link Discoverable}. |
| * @param discoverable An instance of {@link Discoverable}. |
| * @return A node name based on the discoverable. |
| */ |
| private String getNodePath(Discoverable discoverable) { |
| InetSocketAddress socketAddress = discoverable.getSocketAddress(); |
| String node = Hashing.md5() |
| .newHasher() |
| .putBytes(socketAddress.getAddress().getAddress()) |
| .putInt(socketAddress.getPort()) |
| .hash().toString(); |
| |
| return String.format("/%s/%s", discoverable.getName(), node); |
| } |
| |
| private Watcher createConnectionWatcher() { |
| return new Watcher() { |
| // Watcher is invoked from single event thread, hence safe to use normal mutable variable. |
| private boolean expired; |
| |
| @Override |
| public void process(WatchedEvent event) { |
| if (event.getState() == Event.KeeperState.Expired) { |
| LOG.warn("ZK Session expired: {}", zkClient.getConnectString()); |
| expired = true; |
| } else if (event.getState() == Event.KeeperState.SyncConnected && expired) { |
| LOG.info("Reconnected after expiration: {}", zkClient.getConnectString()); |
| expired = false; |
| |
| // Re-register all services |
| lock.lock(); |
| try { |
| for (final Map.Entry<Discoverable, DiscoveryCancellable> entry : discoverables.entries()) { |
| LOG.info("Re-registering service: {}", entry.getKey()); |
| |
| // Must be non-blocking in here. |
| Futures.addCallback(doRegister(entry.getKey()), new FutureCallback<String>() { |
| @Override |
| public void onSuccess(String result) { |
| // Updates the cancellable to the newly created sequential node. |
| entry.getValue().setPath(result); |
| LOG.debug("Service re-registered: {} {}", entry.getKey(), result); |
| } |
| |
| @Override |
| public void onFailure(Throwable t) { |
| // When failed to create the node, there would be no retry and simply make the cancellable do nothing. |
| entry.getValue().setPath(null); |
| LOG.error("Failed to re-register service: {}", entry.getKey(), t); |
| } |
| }, Threads.SAME_THREAD_EXECUTOR); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| } |
| }; |
| } |
| |
| /** |
| * Creates a CacheLoader for creating live Iterable for watching instances changes for a given service. |
| */ |
| private CacheLoader<String, ServiceDiscovered> createServiceLoader() { |
| return new CacheLoader<String, ServiceDiscovered>() { |
| @Override |
| public ServiceDiscovered load(String service) throws Exception { |
| final DefaultServiceDiscovered serviceDiscovered = new DefaultServiceDiscovered(service); |
| final String serviceBase = "/" + service; |
| |
| // Watch for children changes in /service |
| ZKOperations.watchChildren(zkClient, serviceBase, new ZKOperations.ChildrenCallback() { |
| @Override |
| public void updated(NodeChildren nodeChildren) { |
| // Fetch data of all children nodes in parallel. |
| List<String> children = nodeChildren.getChildren(); |
| List<OperationFuture<NodeData>> dataFutures = Lists.newArrayListWithCapacity(children.size()); |
| for (String child : children) { |
| dataFutures.add(zkClient.getData(serviceBase + "/" + child)); |
| } |
| |
| // Update the service map when all fetching are done. |
| final ListenableFuture<List<NodeData>> fetchFuture = Futures.successfulAsList(dataFutures); |
| fetchFuture.addListener(new Runnable() { |
| @Override |
| public void run() { |
| ImmutableSet.Builder<Discoverable> builder = ImmutableSet.builder(); |
| for (NodeData nodeData : Futures.getUnchecked(fetchFuture)) { |
| // For successful fetch, decode the content. |
| if (nodeData != null) { |
| Discoverable discoverable = DiscoverableAdapter.decode(nodeData.getData()); |
| if (discoverable != null) { |
| builder.add(discoverable); |
| } |
| } |
| } |
| serviceDiscovered.setDiscoverables(builder.build()); |
| } |
| }, Threads.SAME_THREAD_EXECUTOR); |
| } |
| }); |
| return serviceDiscovered; |
| } |
| }; |
| } |
| |
| /** |
| * Inner class for cancelling (un-register) discovery service. |
| */ |
| private final class DiscoveryCancellable implements Cancellable { |
| |
| private final Discoverable discoverable; |
| private final AtomicBoolean cancelled; |
| private volatile String path; |
| |
| DiscoveryCancellable(Discoverable discoverable) { |
| this.discoverable = discoverable; |
| this.cancelled = new AtomicBoolean(); |
| } |
| |
| /** |
| * Set the zk node path representing the ephemeral sequence node of this registered discoverable. |
| * Called from ZK event thread when creating of the node completed, either from normal registration or |
| * re-registration due to session expiration. |
| * |
| * @param path The path to ephemeral sequence node. |
| */ |
| void setPath(String path) { |
| this.path = path; |
| if (cancelled.get() && path != null) { |
| // Simply delete the path if it's already cancelled |
| // It's for the case when session expire happened and re-registration completed after this has been cancelled. |
| // Not bother with the result as if there is error, nothing much we could do. |
| zkClient.delete(path); |
| } |
| } |
| |
| @Override |
| public void cancel() { |
| if (!cancelled.compareAndSet(false, true)) { |
| return; |
| } |
| |
| // Take a snapshot of the volatile path. |
| String path = this.path; |
| |
| // If it is null, meaning cancel() is called before the ephemeral node is created, hence |
| // setPath() will be called in future (through zk callback when creation is completed) |
| // so that deletion will be done in setPath(). |
| if (path == null) { |
| return; |
| } |
| |
| // Remove this Cancellable from the map so that upon session expiration won't try to register. |
| lock.lock(); |
| try { |
| discoverables.remove(discoverable, this); |
| } finally { |
| lock.unlock(); |
| } |
| |
| // Delete the path. It's ok if the path not exists |
| // (e.g. what session expired and before node has been re-created) |
| Futures.getUnchecked(ZKOperations.ignoreError(zkClient.delete(path), |
| KeeperException.NoNodeException.class, path)); |
| LOG.debug("Service unregistered: {} {}", discoverable, path); |
| } |
| } |
| } |
| |