blob: 21c9e0708100c5ef01181ec82a80e2f724713049 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.x.discovery.details;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.x.discovery.ServiceCache;
import org.apache.curator.x.discovery.ServiceCacheBuilder;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.curator.x.discovery.ServiceProviderBuilder;
import org.apache.curator.x.discovery.ServiceType;
import org.apache.curator.x.discovery.strategies.RoundRobinStrategy;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
/**
* A mechanism to register and query service instances using ZooKeeper
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T>
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFramework client;
private final String basePath;
private final InstanceSerializer<T> serializer;
private final ConcurrentMap<String, Entry<T>> services = Maps.newConcurrentMap();
private final Collection<ServiceCache<T>> caches = Sets.newSetFromMap(Maps.<ServiceCache<T>, Boolean>newConcurrentMap());
private final Collection<ServiceProvider<T>> providers = Sets.newSetFromMap(Maps.<ServiceProvider<T>, Boolean>newConcurrentMap());
private final boolean watchInstances;
private final ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( (newState == ConnectionState.RECONNECTED) || (newState == ConnectionState.CONNECTED) )
{
try
{
log.debug("Re-registering due to reconnection");
reRegisterServices();
}
catch ( Exception e )
{
log.error("Could not re-register instances after reconnection", e);
}
}
}
};
private static class Entry<T>
{
private volatile ServiceInstance<T> service;
private volatile NodeCache cache;
private Entry(ServiceInstance<T> service)
{
this.service = service;
}
}
/**
* @param client the client
* @param basePath base path to store data
* @param serializer serializer for instances (e.g. {@link JsonInstanceSerializer})
* @param thisInstance instance that represents the service that is running. The instance will get auto-registered
* @param watchInstances if true, watches for changes to locally registered instances
*/
public ServiceDiscoveryImpl(CuratorFramework client, String basePath, InstanceSerializer<T> serializer, ServiceInstance<T> thisInstance, boolean watchInstances)
{
this.watchInstances = watchInstances;
this.client = Preconditions.checkNotNull(client, "client cannot be null");
this.basePath = Preconditions.checkNotNull(basePath, "basePath cannot be null");
this.serializer = Preconditions.checkNotNull(serializer, "serializer cannot be null");
if ( thisInstance != null )
{
Entry<T> entry = new Entry<T>(thisInstance);
entry.cache = makeNodeCache(thisInstance);
services.put(thisInstance.getId(), entry);
}
}
/**
* The discovery must be started before use
*
* @throws Exception errors
*/
@Override
public void start() throws Exception
{
try
{
reRegisterServices();
}
catch ( KeeperException e )
{
log.error("Could not register instances - will try again later", e);
}
client.getConnectionStateListenable().addListener(connectionStateListener);
}
@Override
public void close() throws IOException
{
for ( ServiceCache<T> cache : Lists.newArrayList(caches) )
{
CloseableUtils.closeQuietly(cache);
}
for ( ServiceProvider<T> provider : Lists.newArrayList(providers) )
{
CloseableUtils.closeQuietly(provider);
}
for ( Entry<T> entry : services.values() )
{
try
{
internalUnregisterService(entry);
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
catch ( Exception e )
{
log.error("Could not unregister instance: " + entry.service.getName(), e);
}
}
client.getConnectionStateListenable().removeListener(connectionStateListener);
}
/**
* Register/re-register/update a service instance
*
* @param service service to add
* @throws Exception errors
*/
@Override
public void registerService(ServiceInstance<T> service) throws Exception
{
Entry<T> newEntry = new Entry<T>(service);
Entry<T> oldEntry = services.putIfAbsent(service.getId(), newEntry);
Entry<T> useEntry = (oldEntry != null) ? oldEntry : newEntry;
synchronized(useEntry)
{
if ( useEntry == newEntry ) // i.e. is new
{
useEntry.cache = makeNodeCache(service);
}
internalRegisterService(service);
}
}
@Override
public void updateService(final ServiceInstance<T> service) throws Exception
{
Entry<T> entry = services.get(service.getId());
if ( entry == null )
{
throw new Exception("Service not registered: " + service);
}
synchronized(entry)
{
entry.service = service;
byte[] bytes = serializer.serialize(service);
String path = pathForInstance(service.getName(), service.getId());
client.setData().forPath(path, bytes);
}
}
@VisibleForTesting
protected void internalRegisterService(ServiceInstance<T> service) throws Exception
{
byte[] bytes = serializer.serialize(service);
String path = pathForInstance(service.getName(), service.getId());
final int MAX_TRIES = 2;
boolean isDone = false;
for ( int i = 0; !isDone && (i < MAX_TRIES); ++i )
{
try
{
CreateMode mode = (service.getServiceType() == ServiceType.DYNAMIC) ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT;
client.create().creatingParentContainersIfNeeded().withMode(mode).forPath(path, bytes);
isDone = true;
}
catch ( KeeperException.NodeExistsException e )
{
client.delete().forPath(path); // must delete then re-create so that watchers fire
}
}
}
/**
* Unregister/remove a service instance
*
* @param service the service
* @throws Exception errors
*/
@Override
public void unregisterService(ServiceInstance<T> service) throws Exception
{
Entry<T> entry = services.remove(service.getId());
internalUnregisterService(entry);
}
/**
* Allocate a new builder. {@link ServiceProviderBuilder#providerStrategy} is set to {@link RoundRobinStrategy}
*
* @return the builder
*/
@Override
public ServiceProviderBuilder<T> serviceProviderBuilder()
{
return new ServiceProviderBuilderImpl<T>(this)
.providerStrategy(new RoundRobinStrategy<T>())
.threadFactory(ThreadUtils.newThreadFactory("ServiceProvider"));
}
/**
* Allocate a new service cache builder. The refresh padding is defaulted to 1 second.
*
* @return new cache builder
*/
@Override
public ServiceCacheBuilder<T> serviceCacheBuilder()
{
return new ServiceCacheBuilderImpl<T>(this)
.threadFactory(ThreadUtils.newThreadFactory("ServiceCache"));
}
/**
* Return the names of all known services
*
* @return list of service names
* @throws Exception errors
*/
@Override
public Collection<String> queryForNames() throws Exception
{
List<String> names = client.getChildren().forPath(basePath);
return ImmutableList.copyOf(names);
}
/**
* Return all known instances for the given service
*
* @param name name of the service
* @return list of instances (or an empty list)
* @throws Exception errors
*/
@Override
public Collection<ServiceInstance<T>> queryForInstances(String name) throws Exception
{
return queryForInstances(name, null);
}
/**
* Return a service instance POJO
*
* @param name name of the service
* @param id ID of the instance
* @return the instance or <code>null</code> if not found
* @throws Exception errors
*/
@Override
public ServiceInstance<T> queryForInstance(String name, String id) throws Exception
{
String path = pathForInstance(name, id);
try
{
byte[] bytes = client.getData().forPath(path);
return serializer.deserialize(bytes);
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
return null;
}
void cacheOpened(ServiceCache<T> cache)
{
caches.add(cache);
}
void cacheClosed(ServiceCache<T> cache)
{
caches.remove(cache);
}
void providerOpened(ServiceProvider<T> provider)
{
providers.add(provider);
}
void providerClosed(ServiceProvider<T> cache)
{
providers.remove(cache);
}
CuratorFramework getClient()
{
return client;
}
String pathForName(String name)
{
return ZKPaths.makePath(basePath, name);
}
InstanceSerializer<T> getSerializer()
{
return serializer;
}
List<ServiceInstance<T>> queryForInstances(String name, Watcher watcher) throws Exception
{
ImmutableList.Builder<ServiceInstance<T>> builder = ImmutableList.builder();
String path = pathForName(name);
List<String> instanceIds;
if ( watcher != null )
{
instanceIds = getChildrenWatched(path, watcher, true);
}
else
{
try
{
instanceIds = client.getChildren().forPath(path);
}
catch ( KeeperException.NoNodeException e )
{
instanceIds = Lists.newArrayList();
}
}
for ( String id : instanceIds )
{
ServiceInstance<T> instance = queryForInstance(name, id);
if ( instance != null )
{
builder.add(instance);
}
}
return builder.build();
}
@VisibleForTesting
int debugServicesQty()
{
return services.size();
}
private List<String> getChildrenWatched(String path, Watcher watcher, boolean recurse) throws Exception
{
List<String> instanceIds;
try
{
instanceIds = client.getChildren().usingWatcher(watcher).forPath(path);
}
catch ( KeeperException.NoNodeException e )
{
if ( recurse )
{
try
{
client.create().creatingParentContainersIfNeeded().forPath(path);
}
catch ( KeeperException.NodeExistsException ignore )
{
// ignore
}
instanceIds = getChildrenWatched(path, watcher, false);
}
else
{
throw e;
}
}
return instanceIds;
}
@VisibleForTesting
String pathForInstance(String name, String id)
{
return ZKPaths.makePath(pathForName(name), id);
}
@VisibleForTesting
ServiceInstance<T> getRegisteredService(String id)
{
Entry<T> entry = services.get(id);
return (entry != null) ? entry.service : null;
}
private void reRegisterServices() throws Exception
{
for ( final Entry<T> entry : services.values() )
{
synchronized(entry)
{
internalRegisterService(entry.service);
}
}
}
private NodeCache makeNodeCache(final ServiceInstance<T> instance)
{
if ( !watchInstances )
{
return null;
}
final NodeCache nodeCache = new NodeCache(client, pathForInstance(instance.getName(), instance.getId()));
try
{
nodeCache.start(true);
}
catch ( Exception e )
{
log.error("Could not start node cache for: " + instance, e);
}
NodeCacheListener listener = new NodeCacheListener()
{
@Override
public void nodeChanged() throws Exception
{
if ( nodeCache.getCurrentData() != null )
{
ServiceInstance<T> newInstance = serializer.deserialize(nodeCache.getCurrentData().getData());
Entry<T> entry = services.get(newInstance.getId());
if ( entry != null )
{
synchronized(entry)
{
entry.service = newInstance;
}
}
}
else
{
log.warn("Instance data has been deleted for: " + instance);
}
}
};
nodeCache.getListenable().addListener(listener);
return nodeCache;
}
private void internalUnregisterService(final Entry<T> entry) throws Exception
{
if ( entry != null )
{
synchronized(entry)
{
if ( entry.cache != null )
{
CloseableUtils.closeQuietly(entry.cache);
entry.cache = null;
}
String path = pathForInstance(entry.service.getName(), entry.service.getId());
try
{
client.delete().guaranteed().forPath(path);
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
}
}
}
}