blob: c177302a053a02196210cfdab4799e56b47893d5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.leader;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.utils.CloseableExecutorService;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.utils.PathUtils;
/**
* <p>
* Abstraction to select a "leader" amongst multiple contenders in a group of JMVs connected
* to a Zookeeper cluster. If a group of N thread/processes contends for leadership, one will
* be assigned leader until it releases leadership at which time another one from the group will
* be chosen.
* </p>
* <p>
* Note that this class uses an underlying {@link InterProcessMutex} and as a result leader
* election is "fair" - each user will become leader in the order originally requested
* (from ZK's point of view).
* </p>
*/
public class LeaderSelector implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFramework client;
private final LeaderSelectorListener listener;
private final CloseableExecutorService executorService;
private final InterProcessMutex mutex;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final AtomicBoolean autoRequeue = new AtomicBoolean(false);
private final AtomicReference<Future<?>> ourTask = new AtomicReference<Future<?>>(null);
private volatile boolean hasLeadership;
private volatile String id = "";
@VisibleForTesting
volatile CountDownLatch debugLeadershipLatch = null;
volatile CountDownLatch debugLeadershipWaitLatch = null;
private enum State
{
LATENT,
STARTED,
CLOSED
}
// guarded by synchronization
private boolean isQueued = false;
private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("LeaderSelector");
/**
* @param client the client
* @param leaderPath the path for this leadership group
* @param listener listener
*/
public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener)
{
this(client, leaderPath, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true), listener);
}
/**
* @param client the client
* @param leaderPath the path for this leadership group
* @param threadFactory factory to use for making internal threads
* @param executor the executor to run in
* @param listener listener
* @deprecated This constructor was poorly thought out. Custom executor is useless. Use this version instead: {@link #LeaderSelector(CuratorFramework, String, ExecutorService, LeaderSelectorListener)}
*/
@SuppressWarnings("UnusedParameters")
@Deprecated
public LeaderSelector(CuratorFramework client, String leaderPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)
{
this(client, leaderPath, new CloseableExecutorService(wrapExecutor(executor), true), listener);
}
/**
* @param client the client
* @param leaderPath the path for this leadership group
* @param executorService thread pool to use
* @param listener listener
*/
public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener)
{
this(client, leaderPath, new CloseableExecutorService(executorService), listener);
}
/**
* @param client the client
* @param leaderPath the path for this leadership group
* @param executorService thread pool to use
* @param listener listener
*/
public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener)
{
Preconditions.checkNotNull(client, "client cannot be null");
PathUtils.validatePath(leaderPath);
Preconditions.checkNotNull(listener, "listener cannot be null");
this.client = client;
this.listener = new WrappedListener(this, listener);
hasLeadership = false;
this.executorService = executorService;
mutex = new InterProcessMutex(client, leaderPath)
{
@Override
protected byte[] getLockNodeBytes()
{
return (id.length() > 0) ? getIdBytes(id) : null;
}
};
}
static byte[] getIdBytes(String id)
{
try
{
return id.getBytes("UTF-8");
}
catch ( UnsupportedEncodingException e )
{
throw new Error(e); // this should never happen
}
}
/**
* By default, when {@link LeaderSelectorListener#takeLeadership(CuratorFramework)} returns, this
* instance is not requeued. Calling this method puts the leader selector into a mode where it
* will always requeue itself.
*/
public void autoRequeue()
{
autoRequeue.set(true);
}
/**
* Sets the ID to store for this leader. Will be the value returned
* when {@link #getParticipants()} is called. IMPORTANT: must be called
* prior to {@link #start()} to have effect.
*
* @param id ID
*/
public void setId(String id)
{
Preconditions.checkNotNull(id, "id cannot be null");
this.id = id;
}
/**
* Return the ID that was set via {@link #setId(String)}
*
* @return id
*/
public String getId()
{
return id;
}
/**
* Attempt leadership. This attempt is done in the background - i.e. this method returns
* immediately.<br><br>
* <b>IMPORTANT: </b> previous versions allowed this method to be called multiple times. This
* is no longer supported. Use {@link #requeue()} for this purpose.
*/
public void start()
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
Preconditions.checkState(!executorService.isShutdown(), "Already started");
Preconditions.checkState(!hasLeadership, "Already has leadership");
client.getConnectionStateListenable().addListener(listener);
requeue();
}
/**
* Re-queue an attempt for leadership. If this instance is already queued, nothing
* happens and false is returned. If the instance was not queued, it is re-qeued and true
* is returned
*
* @return true if re-queue is successful
*/
public boolean requeue()
{
Preconditions.checkState(state.get() == State.STARTED, "close() has already been called");
return internalRequeue();
}
private synchronized boolean internalRequeue()
{
if ( !isQueued && (state.get() == State.STARTED) )
{
isQueued = true;
Future<Void> task = executorService.submit(new Callable<Void>()
{
@Override
public Void call() throws Exception
{
try
{
doWorkLoop();
}
finally
{
clearIsQueued();
if ( autoRequeue.get() )
{
internalRequeue();
}
}
return null;
}
});
ourTask.set(task);
return true;
}
return false;
}
/**
* Shutdown this selector and remove yourself from the leadership group
*/
public synchronized void close()
{
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");
client.getConnectionStateListenable().removeListener(listener);
executorService.close();
ourTask.set(null);
}
/**
* <p>
* Returns the set of current participants in the leader selection
* </p>
* <p>
* <p>
* <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
* return a value that does not match {@link #hasLeadership()} as hasLeadership
* uses a local field of the class.
* </p>
*
* @return participants
* @throws Exception ZK errors, interruptions, etc.
*/
public Collection<Participant> getParticipants() throws Exception
{
Collection<String> participantNodes = mutex.getParticipantNodes();
return getParticipants(client, participantNodes);
}
static Collection<Participant> getParticipants(CuratorFramework client, Collection<String> participantNodes) throws Exception
{
ImmutableList.Builder<Participant> builder = ImmutableList.builder();
boolean isLeader = true;
for ( String path : participantNodes )
{
try
{
Participant participant = participantForPath(client, path, isLeader);
builder.add(participant);
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
isLeader = false; // by definition the first node is the leader
}
return builder.build();
}
/**
* <p>
* Return the id for the current leader. If for some reason there is no
* current leader, a dummy participant is returned.
* </p>
* <p>
* <p>
* <B>NOTE</B> - this method polls the ZK server. Therefore it can possibly
* return a value that does not match {@link #hasLeadership()} as hasLeadership
* uses a local field of the class.
* </p>
*
* @return leader
* @throws Exception ZK errors, interruptions, etc.
*/
public Participant getLeader() throws Exception
{
Collection<String> participantNodes = mutex.getParticipantNodes();
return getLeader(client, participantNodes);
}
static Participant getLeader(CuratorFramework client, Collection<String> participantNodes) throws Exception
{
if ( participantNodes.size() > 0 )
{
return participantForPath(client, participantNodes.iterator().next(), true);
}
return new Participant();
}
/**
* Return true if leadership is currently held by this instance
*
* @return true/false
*/
public boolean hasLeadership()
{
return hasLeadership;
}
/**
* Attempt to cancel and interrupt the current leadership if this instance has leadership
*/
public synchronized void interruptLeadership()
{
Future<?> task = ourTask.get();
if ( task != null )
{
task.cancel(true);
}
}
private static Participant participantForPath(CuratorFramework client, String path, boolean markAsLeader) throws Exception
{
byte[] bytes = client.getData().forPath(path);
String thisId = new String(bytes, "UTF-8");
return new Participant(thisId, markAsLeader);
}
@VisibleForTesting
void doWork() throws Exception
{
hasLeadership = false;
try
{
mutex.acquire();
hasLeadership = true;
try
{
if ( debugLeadershipLatch != null )
{
debugLeadershipLatch.countDown();
}
if ( debugLeadershipWaitLatch != null )
{
debugLeadershipWaitLatch.await();
}
listener.takeLeadership(client);
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
throw e;
}
catch ( Throwable e )
{
ThreadUtils.checkInterrupted(e);
}
finally
{
clearIsQueued();
}
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
throw e;
}
finally
{
if ( hasLeadership )
{
hasLeadership = false;
try
{
mutex.release();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("The leader threw an exception", e);
// ignore errors - this is just a safety
}
}
}
}
private void doWorkLoop() throws Exception
{
KeeperException exception = null;
try
{
doWork();
}
catch ( KeeperException.ConnectionLossException e )
{
exception = e;
}
catch ( KeeperException.SessionExpiredException e )
{
exception = e;
}
catch ( InterruptedException ignore )
{
Thread.currentThread().interrupt();
}
if ( (exception != null) && !autoRequeue.get() ) // autoRequeue should ignore connection loss or session expired and just keep trying
{
throw exception;
}
}
private synchronized void clearIsQueued()
{
isQueued = false;
}
// temporary wrapper for deprecated constructor
private static ExecutorService wrapExecutor(final Executor executor)
{
return new AbstractExecutorService()
{
private volatile boolean isShutdown = false;
private volatile boolean isTerminated = false;
@Override
public void shutdown()
{
isShutdown = true;
}
@Override
public List<Runnable> shutdownNow()
{
return Lists.newArrayList();
}
@Override
public boolean isShutdown()
{
return isShutdown;
}
@Override
public boolean isTerminated()
{
return isTerminated;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
{
throw new UnsupportedOperationException();
}
@Override
public void execute(Runnable command)
{
try
{
executor.execute(command);
}
finally
{
isShutdown = true;
isTerminated = true;
}
}
};
}
private static class WrappedListener implements LeaderSelectorListener
{
private final LeaderSelector leaderSelector;
private final LeaderSelectorListener listener;
public WrappedListener(LeaderSelector leaderSelector, LeaderSelectorListener listener)
{
this.leaderSelector = leaderSelector;
this.listener = listener;
}
@Override
public void takeLeadership(CuratorFramework client) throws Exception
{
listener.takeLeadership(client);
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
try
{
listener.stateChanged(client, newState);
}
catch ( CancelLeadershipException dummy )
{
leaderSelector.interruptLeadership();
}
}
}
}