blob: d0c38a244d3637ebb91b2efb3c99430ada507069 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.controller.leader.election;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.timebuffer.CountSumMinMaxAccess;
import org.apache.nifi.util.timebuffer.LongEntityAccess;
import org.apache.nifi.util.timebuffer.TimedBuffer;
import org.apache.nifi.util.timebuffer.TimestampedLong;
import org.apache.nifi.util.timebuffer.TimestampedLongAggregation;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.common.ClientX509Util;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.common.X509Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
public class CuratorLeaderElectionManager implements LeaderElectionManager {
private static final Logger logger = LoggerFactory.getLogger(CuratorLeaderElectionManager.class);
private final FlowEngine leaderElectionMonitorEngine;
private final ZooKeeperClientConfig zkConfig;
private CuratorFramework curatorClient;
private volatile boolean stopped = true;
private final Map<String, LeaderRole> leaderRoles = new HashMap<>();
private final Map<String, RegisteredRole> registeredRoles = new HashMap<>();
private final Map<String, TimedBuffer<TimestampedLong>> leaderChanges = new HashMap<>();
private final TimedBuffer<TimestampedLongAggregation> pollTimes = new TimedBuffer<>(TimeUnit.SECONDS, 300, new CountSumMinMaxAccess());
private final ConcurrentMap<String, String> lastKnownLeader = new ConcurrentHashMap<>();
public CuratorLeaderElectionManager(final int threadPoolSize, final NiFiProperties properties) {
leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader Election Notification", true);
zkConfig = ZooKeeperClientConfig.createConfig(properties);
public synchronized void start() {
if (!stopped) {
stopped = false;
curatorClient = createClient();
// Call #register for each already-registered role. This will
// cause us to start listening for leader elections for that
// role again
for (final Map.Entry<String, RegisteredRole> entry : registeredRoles.entrySet()) {
final RegisteredRole role = entry.getValue();
register(entry.getKey(), role.getListener(), role.getParticipantId());
}"{} started", this);
public boolean isActiveParticipant(final String roleName) {
final RegisteredRole role = registeredRoles.get(roleName);
if (role == null) {
return false;
final String participantId = role.getParticipantId();
return participantId != null;
public void register(String roleName, LeaderElectionStateChangeListener listener) {
register(roleName, listener, null);
private String getElectionPath(final String roleName) {
final String rootPath = zkConfig.getRootPath();
final String leaderPath = rootPath + (rootPath.endsWith("/") ? "" : "/") + "leaders/" + roleName;
return leaderPath;
public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
logger.debug("{} Registering new Leader Selector for role {}", this, roleName);
// If we already have a Leader Role registered and either the Leader Role is participating in election,
// or the given participant id == null (don't want to participant in election) then we're done.
final LeaderRole currentRole = leaderRoles.get(roleName);
if (currentRole != null && (currentRole.isParticipant() || participantId == null)) {"{} Attempted to register Leader Election for role '{}' but this role is already registered", this, roleName);
final String leaderPath = getElectionPath(roleName);
try {
} catch (final IllegalArgumentException e) {
throw new IllegalStateException("Cannot register leader election for role '" + roleName + "' because this is not a valid role name");
registeredRoles.put(roleName, new RegisteredRole(participantId, listener));
final boolean isParticipant = participantId != null && !participantId.trim().isEmpty();
if (!isStopped()) {
final ElectionListener electionListener = new ElectionListener(roleName, listener, participantId);
final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener);
if (isParticipant) {
final LeaderRole leaderRole = new LeaderRole(leaderSelector, electionListener, isParticipant);
leaderRoles.put(roleName, leaderRole);
if (isParticipant) {"{} Registered new Leader Selector for role {}; this node is an active participant in the election.", this, roleName);
} else {"{} Registered new Leader Selector for role {}; this node is a silent observer in the election.", this, roleName);
public synchronized void unregister(final String roleName) {
final LeaderRole leaderRole = leaderRoles.remove(roleName);
if (leaderRole == null) {"Cannot unregister Leader Election Role '{}' because that role is not registered", roleName);
final LeaderSelector leaderSelector = leaderRole.getLeaderSelector();
if (leaderSelector == null) {"Cannot unregister Leader Election Role '{}' because that role is not registered", roleName);
leaderSelector.close();"This node is no longer registered to be elected as the Leader for Role '{}'", roleName);
public synchronized void stop() {
stopped = true;
for (final Map.Entry<String, LeaderRole> entry : leaderRoles.entrySet()) {
final LeaderRole role = entry.getValue();
final LeaderSelector selector = role.getLeaderSelector();
try {
} catch (final Exception e) {
logger.warn("Failed to close Leader Selector for {}", entry.getKey(), e);
if (curatorClient != null) {
curatorClient = null;
}"{} stopped and closed", this);
public boolean isStopped() {
return stopped;
public String toString() {
return "CuratorLeaderElectionManager[stopped=" + isStopped() + "]";
private synchronized LeaderRole getLeaderRole(final String roleName) {
return leaderRoles.get(roleName);
private synchronized void onLeaderChanged(final String roleName) {
final TimedBuffer<TimestampedLong> buffer = leaderChanges.computeIfAbsent(roleName, key -> new TimedBuffer<>(TimeUnit.HOURS, 24, new LongEntityAccess()));
buffer.add(new TimestampedLong(1L));
public synchronized Map<String, Integer> getLeadershipChangeCount(final long duration, final TimeUnit unit) {
final Map<String, Integer> leadershipChangesPerRole = new HashMap<>();
for (final Map.Entry<String, TimedBuffer<TimestampedLong>> entry : leaderChanges.entrySet()) {
final String roleName = entry.getKey();
final TimedBuffer<TimestampedLong> buffer = entry.getValue();
final TimestampedLong aggregateValue = buffer.getAggregateValue(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(duration, unit));
final int leadershipChanges = aggregateValue.getValue().intValue();
leadershipChangesPerRole.put(roleName, leadershipChanges);
return leadershipChangesPerRole;
public boolean isLeader(final String roleName) {
final boolean activeParticipant = isActiveParticipant(roleName);
if (!activeParticipant) {
logger.debug("Node is not an active participant in election for role {} so cannot be leader", roleName);
return false;
final LeaderRole role = getLeaderRole(roleName);
if (role == null) {
logger.debug("Node is an active participant in election for role {} but there is no LeaderRole registered so this node cannot be leader", roleName);
return false;
return role.isLeader();
public String getLeader(final String roleName) {
if (isStopped()) {
return determineLeaderExternal(roleName);
final LeaderRole role = getLeaderRole(roleName);
if (role == null) {
return determineLeaderExternal(roleName);
final long startNanos = System.nanoTime();
Participant participant;
try {
participant = role.getLeaderSelector().getLeader();
} catch (Exception e) {
logger.debug("Unable to determine leader for role '{}'; returning null", roleName);
return null;
if (participant == null) {
return null;
final String participantId = participant.getId();
if (StringUtils.isEmpty(participantId)) {
return null;
registerPollTime(System.nanoTime() - startNanos);
final String previousLeader = lastKnownLeader.put(roleName, participantId);
if (previousLeader != null && !previousLeader.equals(participantId)) {
return participantId;
private synchronized void registerPollTime(final long nanos) {
public synchronized long getAveragePollTime(final TimeUnit timeUnit) {
final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
if (aggregation == null || aggregation.getCount() == 0) {
return 0L;
final long averageNanos = aggregation.getSum() / aggregation.getCount();
return timeUnit.convert(averageNanos, TimeUnit.NANOSECONDS);
public synchronized long getMinPollTime(final TimeUnit timeUnit) {
final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
if (aggregation == null) {
return 0L;
final long minNanos = aggregation.getMin();
return timeUnit.convert(minNanos, TimeUnit.NANOSECONDS);
public synchronized long getMaxPollTime(final TimeUnit timeUnit) {
final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
if (aggregation == null) {
return 0L;
final long minNanos = aggregation.getMin();
return timeUnit.convert(minNanos, TimeUnit.NANOSECONDS);
public synchronized long getPollCount() {
final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
if (aggregation == null) {
return 0L;
return aggregation.getCount();
* Determines whether or not leader election has already begun for the role with the given name
* @param roleName the role of interest
* @return <code>true</code> if leader election has already begun, <code>false</code> if it has not or if unable to determine this.
public boolean isLeaderElected(final String roleName) {
final String leaderAddress = determineLeaderExternal(roleName);
return !StringUtils.isEmpty(leaderAddress);
* Use a new Curator client to determine which node is the elected leader for the given role.
* @param roleName the name of the role
* @return the id of the elected leader, or <code>null</code> if no leader has been selected or if unable to determine
* the leader from ZooKeeper
private String determineLeaderExternal(final String roleName) {
final long start = System.nanoTime();
try (CuratorFramework client = createClient()) {
final LeaderSelectorListener electionListener = new LeaderSelectorListener() {
public void stateChanged(CuratorFramework client, ConnectionState newState) {
public void takeLeadership(CuratorFramework client) throws Exception {
final String electionPath = getElectionPath(roleName);
// Note that we intentionally do not auto-requeue here, and we do not start the selector. We do not
// want to join the leader election. We simply want to observe.
final LeaderSelector selector = new LeaderSelector(client, electionPath, electionListener);
try {
final Participant leader = selector.getLeader();
return leader == null ? null : leader.getId();
} catch (final KeeperException.NoNodeException nne) {
// If there is no ZNode, then there is no elected leader.
return null;
} catch (final Exception e) {
logger.warn("Unable to determine the Elected Leader for role '{}' due to {}; assuming no leader has been elected", roleName, e.toString());
if (logger.isDebugEnabled()) {
logger.warn("", e);
return null;
} finally {
registerPollTime(System.nanoTime() - start);
private CuratorFramework createClient() {
// Create a new client because we don't want to try indefinitely for this to occur.
final RetryPolicy retryPolicy = new RetryNTimes(1, 100);
final CuratorACLProviderFactory aclProviderFactory = new CuratorACLProviderFactory();
final CuratorFrameworkFactory.Builder clientBuilder = CuratorFrameworkFactory.builder()
.defaultData(new byte[0]);
if (zkConfig.isClientSecure()) {
clientBuilder.zookeeperFactory(new SecureClientZooKeeperFactory(zkConfig));
final CuratorFramework client =;
return client;
private static class LeaderRole {
private final LeaderSelector leaderSelector;
private final ElectionListener electionListener;
private final boolean participant;
public LeaderRole(final LeaderSelector leaderSelector, final ElectionListener electionListener, final boolean participant) {
this.leaderSelector = leaderSelector;
this.electionListener = electionListener;
this.participant = participant;
public LeaderSelector getLeaderSelector() {
return leaderSelector;
public ElectionListener getElectionListener() {
return electionListener;
public boolean isLeader() {
return electionListener.isLeader();
public boolean isParticipant() {
return participant;
private static class RegisteredRole {
private final LeaderElectionStateChangeListener listener;
private final String participantId;
public RegisteredRole(final String participantId, final LeaderElectionStateChangeListener listener) {
this.participantId = participantId;
this.listener = listener;
public LeaderElectionStateChangeListener getListener() {
return listener;
public String getParticipantId() {
return participantId;
private class ElectionListener extends LeaderSelectorListenerAdapter implements LeaderSelectorListener {
private final String roleName;
private final LeaderElectionStateChangeListener listener;
private final String participantId;
private volatile boolean leader;
private volatile Thread leaderThread;
private long leaderUpdateTimestamp = 0L;
private final long MAX_CACHE_MILLIS = TimeUnit.SECONDS.toMillis(5L);
public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
this.roleName = roleName;
this.listener = listener;
this.participantId = participantId;
public void disable() {"Election Listener for Role {} disabled", roleName);
if (leaderThread == null) {
logger.debug("Election Listener for Role {} disabled but there is no leader thread. Will not interrupt any threads.", roleName);
} else {
public synchronized boolean isLeader() {
if (leaderUpdateTimestamp < System.currentTimeMillis() - MAX_CACHE_MILLIS) {
try {
final long start = System.nanoTime();
final boolean zkLeader = verifyLeader();
final long nanos = System.nanoTime() - start;
logger.debug("Took {} nanoseconds to reach out to ZooKeeper in order to check whether or not this node is currently the leader for Role '{}'. ZooKeeper reported {}",
nanos, roleName, zkLeader);
} catch (final Exception e) {
logger.warn("Attempted to reach out to ZooKeeper to determine whether or not this node is the elected leader for Role '{}' but failed to communicate with ZooKeeper. " +
"Assuming that this node is not the leader.", roleName, e);
return false;
} else {
logger.debug("Checking if this node is leader for role {}: using cached response, returning {}", roleName, leader);
return leader;
private synchronized void setLeader(final boolean leader) {
this.leader = leader;
this.leaderUpdateTimestamp = System.currentTimeMillis();
public synchronized void stateChanged(final CuratorFramework client, final ConnectionState newState) {"{} Connection State changed to {}", this,;
if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) {
if (leader) {"Because Connection State was changed to {}, will relinquish leadership for role '{}'", newState, roleName);
super.stateChanged(client, newState);
* Reach out to ZooKeeper to verify that this node still is the leader. We do this because at times, a node will lose
* its position as leader but the Curator client will fail to notify us, perhaps due to network failure, etc.
* @return <code>true</code> if this node is still the elected leader according to ZooKeeper, false otherwise
private boolean verifyLeader() {
final String leader = getLeader(roleName);
if (leader == null) {
logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}' but found that there is no leader.", roleName);
return false;
final boolean match = leader.equals(participantId);
logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}'. Elected Leader = '{}', Participant ID = '{}', This Node Elected = {}",
roleName, leader, participantId, match);
return match;
public void takeLeadership(final CuratorFramework client) throws Exception {
leaderThread = Thread.currentThread();
setLeader(true);"{} This node has been elected Leader for Role '{}'", this, roleName);
if (listener != null) {
try {
} catch (final Exception e) {
logger.error("This node was elected Leader for Role '{}' but failed to take leadership. Will relinquish leadership role. Failure was due to: {}", roleName, e);
logger.error("", e);
// Curator API states that we lose the leadership election when we return from this method,
// so we will block as long as we are not interrupted or closed. Then, we will set leader to false.
try {
int failureCount = 0;
int loopCount = 0;
while (!isStopped() && leader) {
try {
} catch (final InterruptedException ie) {"{} has been interrupted; no longer leader for role '{}'", this, roleName);
if (leader && ++loopCount % 50 == 0) {
// While Curator is supposed to interrupt this thread when we are no longer the leader, we have occasionally
// seen occurrences where the thread does not get interrupted. As a result, we will reach out to ZooKeeper
// periodically to determine whether or not this node is still the elected leader.
try {
final boolean stillLeader = verifyLeader();
failureCount = 0; // we got a response, so we were successful in communicating with zookeeper. Set failureCount back to 0.
if (!stillLeader) {"According to ZooKeeper, this node is no longer the leader for Role '{}'. Will relinquish leadership.", roleName);
} catch (final Exception e) {
if (failureCount > 1) {
logger.warn("Attempted to reach out to ZooKeeper to verify that this node still is the elected leader for Role '{}' "
+ "but failed to communicate with ZooKeeper. This is the second failed attempt, so will relinquish leadership of this role.", roleName, e);
} else {
logger.warn("Attempted to reach out to ZooKeeper to verify that this node still is the elected leader for Role '{}' "
+ "but failed to communicate with ZooKeeper. Will wait a bit and attempt to communicate with ZooKeeper again before relinquishing this role.", roleName, e);
} finally {
setLeader(false);"{} This node is no longer leader for role '{}'", this, roleName);
if (listener != null) {
try {
} catch (final Exception e) {
logger.error("This node is no longer leader for role '{}' but failed to shutdown leadership responsibilities properly due to: {}", roleName, e.toString());
if (logger.isDebugEnabled()) {
logger.error("", e);
public static class SecureClientZooKeeperFactory implements ZookeeperFactory {
public static final String NETTY_CLIENT_CNXN_SOCKET =
private ZKClientConfig zkSecureClientConfig;
public SecureClientZooKeeperFactory(final ZooKeeperClientConfig zkConfig) {
this.zkSecureClientConfig = new ZKClientConfig();
// Netty is required for the secure client config.
final String cnxnSocket = zkConfig.getConnectionSocket();
if (!NETTY_CLIENT_CNXN_SOCKET.equals(cnxnSocket)) {
throw new IllegalArgumentException(String.format("connection factory set to '%s', %s required", String.valueOf(cnxnSocket), NETTY_CLIENT_CNXN_SOCKET));
zkSecureClientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, cnxnSocket);
// This should never happen but won't get checked elsewhere.
final boolean clientSecure = zkConfig.isClientSecure();
if (!clientSecure) {
throw new IllegalStateException(String.format("%s set to '%b', expected true", ZKClientConfig.SECURE_CLIENT, clientSecure));
zkSecureClientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, String.valueOf(clientSecure));
final X509Util clientX509util = new ClientX509Util();
zkSecureClientConfig.setProperty(clientX509util.getSslKeystoreLocationProperty(), zkConfig.getKeyStore());
zkSecureClientConfig.setProperty(clientX509util.getSslKeystoreTypeProperty(), zkConfig.getKeyStoreType());
zkSecureClientConfig.setProperty(clientX509util.getSslKeystorePasswdProperty(), zkConfig.getKeyStorePassword());
zkSecureClientConfig.setProperty(clientX509util.getSslTruststoreLocationProperty(), zkConfig.getTrustStore());
zkSecureClientConfig.setProperty(clientX509util.getSslTruststoreTypeProperty(), zkConfig.getTrustStoreType());
zkSecureClientConfig.setProperty(clientX509util.getSslTruststorePasswdProperty(), zkConfig.getTrustStorePassword());
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception {
return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, zkSecureClientConfig);