blob: 4f2d0f7c583d4a02ff6f760f1f3b717456184219 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.discovery;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.Constants;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Zookeeper implementation of {@link DiscoveryService} and {@link DiscoveryServiceClient}.
* <p>
* Discoverable services are registered within Zookeeper under the namespace 'discoverable' by default.
* If you would like to change the namespace under which the services are registered then you can pass
* in the namespace during construction of {@link ZKDiscoveryService}.
* </p>
*
* <p>
* Following is a simple example of how {@link ZKDiscoveryService} can be used for registering services
* and also for discovering the registered services.
* </p>
*
* <blockquote>
* <pre>
* {@code
*
* DiscoveryService service = new ZKDiscoveryService(zkClient);
* service.register(new Discoverable() {
* &#64;Override
* public String getName() {
* return 'service-name';
* }
*
* &#64;Override
* public InetSocketAddress getSocketAddress() {
* return new InetSocketAddress(hostname, port);
* }
* });
* ...
* ...
* ServiceDiscovered services = service.discovery("service-name");
* ...
* }
* </pre>
* </blockquote>
*/
public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient {
private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class);
private static final long RETRY_MILLIS = 1000;
// In memory map for recreating ephemeral nodes after session expires.
// It map from discoverable to the corresponding Cancellable
private final Multimap<Discoverable, DiscoveryCancellable> discoverables;
private final Lock lock;
private final LoadingCache<String, ServiceDiscovered> services;
private final ZKClient zkClient;
private final ScheduledExecutorService retryExecutor;
/**
* Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry.
* @param zkClient The {@link ZKClient} for interacting with zookeeper.
*/
public ZKDiscoveryService(ZKClient zkClient) {
this(zkClient, Constants.DISCOVERY_PATH_PREFIX);
}
/**
* Constructs ZKDiscoveryService using the provided zookeeper client for storing service registry under namespace.
* @param zkClient of zookeeper quorum
* @param namespace under which the service registered would be stored in zookeeper.
* If namespace is {@code null}, no namespace will be used.
*/
public ZKDiscoveryService(ZKClient zkClient, String namespace) {
this.discoverables = HashMultimap.create();
this.lock = new ReentrantLock();
this.retryExecutor = Executors.newSingleThreadScheduledExecutor(
Threads.createDaemonThreadFactory("zk-discovery-retry"));
this.zkClient = namespace == null ? zkClient : ZKClients.namespace(zkClient, namespace);
this.services = CacheBuilder.newBuilder().build(createServiceLoader());
this.zkClient.addConnectionWatcher(createConnectionWatcher());
}
/**
* Registers a {@link Discoverable} in zookeeper.
* <p>
* Registering a {@link Discoverable} will create a node &lt;base&gt;/&lt;service-name&gt;
* in zookeeper as a ephemeral node. If the node already exists (timeout associated with emphemeral node creation),
* then a runtime exception is thrown to make sure that a service with an intent to register is not started without
* registering.
* When a runtime exception is thrown, expectation is that the process being started will fail and would be started
* again by the monitoring service.
* </p>
* @param discoverable Information of the service provider that could be discovered.
* @return An instance of {@link Cancellable}
*/
@Override
public Cancellable register(final Discoverable discoverable) {
final SettableFuture<String> future = SettableFuture.create();
final DiscoveryCancellable cancellable = new DiscoveryCancellable(discoverable);
// Create the zk ephemeral node.
Futures.addCallback(doRegister(discoverable), new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
// Set the sequence node path to cancellable for future cancellation.
cancellable.setPath(result);
lock.lock();
try {
discoverables.put(discoverable, cancellable);
} finally {
lock.unlock();
}
LOG.debug("Service registered: {} {}", discoverable, result);
future.set(result);
}
@Override
public void onFailure(Throwable t) {
if (t instanceof KeeperException.NodeExistsException) {
handleRegisterFailure(discoverable, future, this, t);
} else {
LOG.warn("Failed to register: {}", discoverable, t);
future.setException(t);
}
}
}, Threads.SAME_THREAD_EXECUTOR);
Futures.getUnchecked(future);
return cancellable;
}
@Override
public ServiceDiscovered discover(String service) {
return services.getUnchecked(service);
}
/**
* Handle registration failure.
*
* @param discoverable The discoverable to register.
* @param completion A settable future to set when registration is completed / failed.
* @param creationCallback A future callback for path creation.
* @param failureCause The original cause of failure.
*/
private void handleRegisterFailure(final Discoverable discoverable,
final SettableFuture<String> completion,
final FutureCallback<String> creationCallback,
final Throwable failureCause) {
final String path = getNodePath(discoverable);
Futures.addCallback(zkClient.exists(path), new FutureCallback<Stat>() {
@Override
public void onSuccess(Stat result) {
if (result == null) {
// If the node is gone, simply retry.
LOG.info("Node {} is gone. Retry registration for {}.", path, discoverable);
retryRegister(discoverable, creationCallback);
return;
}
long ephemeralOwner = result.getEphemeralOwner();
if (ephemeralOwner == 0) {
// it is not an ephemeral node, something wrong.
LOG.error("Node {} already exists and is not an ephemeral node. Discoverable registration failed: {}.",
path, discoverable);
completion.setException(failureCause);
return;
}
Long sessionId = zkClient.getSessionId();
if (sessionId == null || ephemeralOwner != sessionId) {
// This zkClient is not valid or doesn't own the ephemeral node, simply keep retrying.
LOG.info("Owner of {} is different. Retry registration for {}.", path, discoverable);
retryRegister(discoverable, creationCallback);
} else {
// This client owned the node, treat the registration as completed.
// This could happen if same client tries to register twice (due to mistake or failure race condition).
completion.set(path);
}
}
@Override
public void onFailure(Throwable t) {
// If exists call failed, simply retry creation.
LOG.warn("Error when getting stats on {}. Retry registration for {}.", path, discoverable);
retryRegister(discoverable, creationCallback);
}
}, Threads.SAME_THREAD_EXECUTOR);
}
private OperationFuture<String> doRegister(Discoverable discoverable) {
byte[] discoverableBytes = DiscoverableAdapter.encode(discoverable);
return zkClient.create(getNodePath(discoverable), discoverableBytes, CreateMode.EPHEMERAL, true);
}
private void retryRegister(final Discoverable discoverable, final FutureCallback<String> creationCallback) {
retryExecutor.schedule(new Runnable() {
@Override
public void run() {
Futures.addCallback(doRegister(discoverable), creationCallback, Threads.SAME_THREAD_EXECUTOR);
}
}, RETRY_MILLIS, TimeUnit.MILLISECONDS);
}
/**
* Generate unique node path for a given {@link Discoverable}.
* @param discoverable An instance of {@link Discoverable}.
* @return A node name based on the discoverable.
*/
private String getNodePath(Discoverable discoverable) {
InetSocketAddress socketAddress = discoverable.getSocketAddress();
String node = Hashing.md5()
.newHasher()
.putBytes(socketAddress.getAddress().getAddress())
.putInt(socketAddress.getPort())
.hash().toString();
return String.format("/%s/%s", discoverable.getName(), node);
}
private Watcher createConnectionWatcher() {
return new Watcher() {
// Watcher is invoked from single event thread, hence safe to use normal mutable variable.
private boolean expired;
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.Expired) {
LOG.warn("ZK Session expired: {}", zkClient.getConnectString());
expired = true;
} else if (event.getState() == Event.KeeperState.SyncConnected && expired) {
LOG.info("Reconnected after expiration: {}", zkClient.getConnectString());
expired = false;
// Re-register all services
lock.lock();
try {
for (final Map.Entry<Discoverable, DiscoveryCancellable> entry : discoverables.entries()) {
LOG.info("Re-registering service: {}", entry.getKey());
// Must be non-blocking in here.
Futures.addCallback(doRegister(entry.getKey()), new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
// Updates the cancellable to the newly created sequential node.
entry.getValue().setPath(result);
LOG.debug("Service re-registered: {} {}", entry.getKey(), result);
}
@Override
public void onFailure(Throwable t) {
// When failed to create the node, there would be no retry and simply make the cancellable do nothing.
entry.getValue().setPath(null);
LOG.error("Failed to re-register service: {}", entry.getKey(), t);
}
}, Threads.SAME_THREAD_EXECUTOR);
}
} finally {
lock.unlock();
}
}
}
};
}
/**
* Creates a CacheLoader for creating live Iterable for watching instances changes for a given service.
*/
private CacheLoader<String, ServiceDiscovered> createServiceLoader() {
return new CacheLoader<String, ServiceDiscovered>() {
@Override
public ServiceDiscovered load(String service) throws Exception {
final DefaultServiceDiscovered serviceDiscovered = new DefaultServiceDiscovered(service);
final String serviceBase = "/" + service;
// Watch for children changes in /service
ZKOperations.watchChildren(zkClient, serviceBase, new ZKOperations.ChildrenCallback() {
@Override
public void updated(NodeChildren nodeChildren) {
// Fetch data of all children nodes in parallel.
List<String> children = nodeChildren.getChildren();
List<OperationFuture<NodeData>> dataFutures = Lists.newArrayListWithCapacity(children.size());
for (String child : children) {
dataFutures.add(zkClient.getData(serviceBase + "/" + child));
}
// Update the service map when all fetching are done.
final ListenableFuture<List<NodeData>> fetchFuture = Futures.successfulAsList(dataFutures);
fetchFuture.addListener(new Runnable() {
@Override
public void run() {
ImmutableSet.Builder<Discoverable> builder = ImmutableSet.builder();
for (NodeData nodeData : Futures.getUnchecked(fetchFuture)) {
// For successful fetch, decode the content.
if (nodeData != null) {
Discoverable discoverable = DiscoverableAdapter.decode(nodeData.getData());
if (discoverable != null) {
builder.add(discoverable);
}
}
}
serviceDiscovered.setDiscoverables(builder.build());
}
}, Threads.SAME_THREAD_EXECUTOR);
}
});
return serviceDiscovered;
}
};
}
/**
* Inner class for cancelling (un-register) discovery service.
*/
private final class DiscoveryCancellable implements Cancellable {
private final Discoverable discoverable;
private final AtomicBoolean cancelled;
private volatile String path;
DiscoveryCancellable(Discoverable discoverable) {
this.discoverable = discoverable;
this.cancelled = new AtomicBoolean();
}
/**
* Set the zk node path representing the ephemeral sequence node of this registered discoverable.
* Called from ZK event thread when creating of the node completed, either from normal registration or
* re-registration due to session expiration.
*
* @param path The path to ephemeral sequence node.
*/
void setPath(String path) {
this.path = path;
if (cancelled.get() && path != null) {
// Simply delete the path if it's already cancelled
// It's for the case when session expire happened and re-registration completed after this has been cancelled.
// Not bother with the result as if there is error, nothing much we could do.
zkClient.delete(path);
}
}
@Override
public void cancel() {
if (!cancelled.compareAndSet(false, true)) {
return;
}
// Take a snapshot of the volatile path.
String path = this.path;
// If it is null, meaning cancel() is called before the ephemeral node is created, hence
// setPath() will be called in future (through zk callback when creation is completed)
// so that deletion will be done in setPath().
if (path == null) {
return;
}
// Remove this Cancellable from the map so that upon session expiration won't try to register.
lock.lock();
try {
discoverables.remove(discoverable, this);
} finally {
lock.unlock();
}
// Delete the path. It's ok if the path not exists
// (e.g. what session expired and before node has been re-created)
Futures.getUnchecked(ZKOperations.ignoreError(zkClient.delete(path),
KeeperException.NoNodeException.class, path));
LOG.debug("Service unregistered: {} {}", discoverable, path);
}
}
}