blob: 5fe8153cc55474fbba15a9d19f459ad8d35c7f55 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.leader.election;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.timebuffer.CountSumMinMaxAccess;
import org.apache.nifi.util.timebuffer.LongEntityAccess;
import org.apache.nifi.util.timebuffer.TimedBuffer;
import org.apache.nifi.util.timebuffer.TimestampedLong;
import org.apache.nifi.util.timebuffer.TimestampedLongAggregation;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.common.ClientX509Util;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.common.X509Util;
import org.apache.zookeeper.common.ZKConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
public class CuratorLeaderElectionManager implements LeaderElectionManager {
private static final Logger logger = LoggerFactory.getLogger(CuratorLeaderElectionManager.class);
private final FlowEngine leaderElectionMonitorEngine;
private final ZooKeeperClientConfig zkConfig;
private CuratorFramework curatorClient;
private volatile boolean stopped = true;
private final ConcurrentMap<String, LeaderRole> leaderRoles = new ConcurrentHashMap<>();
private final ConcurrentMap<String, RegisteredRole> registeredRoles = new ConcurrentHashMap<>();
private final ConcurrentMap<String, TimedBuffer<TimestampedLong>> leaderChanges = new ConcurrentHashMap<>();
private final TimedBuffer<TimestampedLongAggregation> pollTimes = new TimedBuffer<>(TimeUnit.SECONDS, 300, new CountSumMinMaxAccess());
private final ConcurrentMap<String, String> lastKnownLeader = new ConcurrentHashMap<>();
public CuratorLeaderElectionManager(final int threadPoolSize, final NiFiProperties properties) {
leaderElectionMonitorEngine = new FlowEngine(threadPoolSize, "Leader Election Notification", true);
zkConfig = ZooKeeperClientConfig.createConfig(properties);
}
@Override
public synchronized void start() {
if (!stopped) {
return;
}
stopped = false;
curatorClient = createClient();
// Call #register for each already-registered role. This will
// cause us to start listening for leader elections for that
// role again
for (final Map.Entry<String, RegisteredRole> entry : registeredRoles.entrySet()) {
final RegisteredRole role = entry.getValue();
register(entry.getKey(), role.getListener(), role.getParticipantId());
}
logger.info("{} started", this);
}
@Override
public boolean isActiveParticipant(final String roleName) {
final RegisteredRole role = registeredRoles.get(roleName);
if (role == null) {
return false;
}
final String participantId = role.getParticipantId();
return participantId != null;
}
@Override
public void register(String roleName, LeaderElectionStateChangeListener listener) {
register(roleName, listener, null);
}
private String getElectionPath(final String roleName) {
final String rootPath = zkConfig.getRootPath();
final String leaderPath = rootPath + (rootPath.endsWith("/") ? "" : "/") + "leaders/" + roleName;
return leaderPath;
}
@Override
public synchronized void register(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
logger.debug("{} Registering new Leader Selector for role {}", this, roleName);
// If we already have a Leader Role registered and either the Leader Role is participating in election,
// or the given participant id == null (don't want to participant in election) then we're done.
final LeaderRole currentRole = leaderRoles.get(roleName);
if (currentRole != null && (currentRole.isParticipant() || participantId == null)) {
logger.info("{} Attempted to register Leader Election for role '{}' but this role is already registered", this, roleName);
return;
}
final String leaderPath = getElectionPath(roleName);
try {
PathUtils.validatePath(leaderPath);
} catch (final IllegalArgumentException e) {
throw new IllegalStateException("Cannot register leader election for role '" + roleName + "' because this is not a valid role name");
}
registeredRoles.put(roleName, new RegisteredRole(participantId, listener));
final boolean isParticipant = participantId != null && !participantId.trim().isEmpty();
if (!isStopped()) {
final ElectionListener electionListener = new ElectionListener(roleName, listener, participantId);
final LeaderSelector leaderSelector = new LeaderSelector(curatorClient, leaderPath, leaderElectionMonitorEngine, electionListener);
if (isParticipant) {
leaderSelector.autoRequeue();
leaderSelector.setId(participantId);
leaderSelector.start();
}
final LeaderRole leaderRole = new LeaderRole(leaderSelector, electionListener, isParticipant);
leaderRoles.put(roleName, leaderRole);
}
if (isParticipant) {
logger.info("{} Registered new Leader Selector for role {}; this node is an active participant in the election.", this, roleName);
} else {
logger.info("{} Registered new Leader Selector for role {}; this node is a silent observer in the election.", this, roleName);
}
}
@Override
public synchronized void unregister(final String roleName) {
registeredRoles.remove(roleName);
final LeaderRole leaderRole = leaderRoles.remove(roleName);
if (leaderRole == null) {
logger.info("Cannot unregister Leader Election Role '{}' because that role is not registered", roleName);
return;
}
final LeaderSelector leaderSelector = leaderRole.getLeaderSelector();
if (leaderSelector == null) {
logger.info("Cannot unregister Leader Election Role '{}' because that role is not registered", roleName);
return;
}
leaderRole.getElectionListener().disable();
leaderSelector.close();
logger.info("This node is no longer registered to be elected as the Leader for Role '{}'", roleName);
}
@Override
public synchronized void stop() {
stopped = true;
for (final Map.Entry<String, LeaderRole> entry : leaderRoles.entrySet()) {
final LeaderRole role = entry.getValue();
final LeaderSelector selector = role.getLeaderSelector();
try {
selector.close();
} catch (final Exception e) {
logger.warn("Failed to close Leader Selector for {}", entry.getKey(), e);
}
}
leaderRoles.clear();
if (curatorClient != null) {
curatorClient.close();
curatorClient = null;
}
logger.info("{} stopped and closed", this);
}
@Override
public boolean isStopped() {
return stopped;
}
@Override
public String toString() {
return "CuratorLeaderElectionManager[stopped=" + isStopped() + "]";
}
private LeaderRole getLeaderRole(final String roleName) {
return leaderRoles.get(roleName);
}
private void onLeaderChanged(final String roleName) {
final TimedBuffer<TimestampedLong> buffer = leaderChanges.computeIfAbsent(roleName, key -> new TimedBuffer<>(TimeUnit.HOURS, 24, new LongEntityAccess()));
buffer.add(new TimestampedLong(1L));
}
public Map<String, Integer> getLeadershipChangeCount(final long duration, final TimeUnit unit) {
final Map<String, Integer> leadershipChangesPerRole = new HashMap<>();
for (final Map.Entry<String, TimedBuffer<TimestampedLong>> entry : leaderChanges.entrySet()) {
final String roleName = entry.getKey();
final TimedBuffer<TimestampedLong> buffer = entry.getValue();
final TimestampedLong aggregateValue = buffer.getAggregateValue(System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(duration, unit));
final int leadershipChanges = aggregateValue.getValue().intValue();
leadershipChangesPerRole.put(roleName, leadershipChanges);
}
return leadershipChangesPerRole;
}
@Override
public boolean isLeader(final String roleName) {
final boolean activeParticipant = isActiveParticipant(roleName);
if (!activeParticipant) {
logger.debug("Node is not an active participant in election for role {} so cannot be leader", roleName);
return false;
}
final LeaderRole role = getLeaderRole(roleName);
if (role == null) {
logger.debug("Node is an active participant in election for role {} but there is no LeaderRole registered so this node cannot be leader", roleName);
return false;
}
return role.isLeader();
}
@Override
public String getLeader(final String roleName) {
if (isStopped()) {
return determineLeaderExternal(roleName);
}
final LeaderRole role = getLeaderRole(roleName);
if (role == null) {
return determineLeaderExternal(roleName);
}
final long startNanos = System.nanoTime();
Participant participant;
try {
participant = role.getLeaderSelector().getLeader();
} catch (Exception e) {
logger.debug("Unable to determine leader for role '{}'; returning null", roleName);
return null;
}
if (participant == null) {
return null;
}
final String participantId = participant.getId();
if (StringUtils.isEmpty(participantId)) {
return null;
}
registerPollTime(System.nanoTime() - startNanos);
final String previousLeader = lastKnownLeader.put(roleName, participantId);
if (previousLeader != null && !previousLeader.equals(participantId)) {
onLeaderChanged(roleName);
}
return participantId;
}
private void registerPollTime(final long nanos) {
synchronized (pollTimes) {
pollTimes.add(TimestampedLongAggregation.newValue(nanos));
}
}
public long getAveragePollTime(final TimeUnit timeUnit) {
final long averageNanos;
synchronized (pollTimes) {
final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
if (aggregation == null || aggregation.getCount() == 0) {
return 0L;
}
averageNanos = aggregation.getSum() / aggregation.getCount();
}
return timeUnit.convert(averageNanos, TimeUnit.NANOSECONDS);
}
public long getMinPollTime(final TimeUnit timeUnit) {
final long minNanos;
synchronized (pollTimes) {
final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
if (aggregation == null) {
return 0L;
}
minNanos = aggregation.getMin();
}
return timeUnit.convert(minNanos, TimeUnit.NANOSECONDS);
}
public long getMaxPollTime(final TimeUnit timeUnit) {
final long maxNanos;
synchronized (pollTimes) {
final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
if (aggregation == null) {
return 0L;
}
maxNanos = aggregation.getMax();
}
return timeUnit.convert(maxNanos, TimeUnit.NANOSECONDS);
}
@Override
public long getPollCount() {
synchronized (pollTimes) {
final TimestampedLongAggregation.TimestampedAggregation aggregation = pollTimes.getAggregateValue(0L).getAggregation();
if (aggregation == null) {
return 0L;
}
return aggregation.getCount();
}
}
/**
* Determines whether or not leader election has already begun for the role with the given name
*
* @param roleName the role of interest
* @return <code>true</code> if leader election has already begun, <code>false</code> if it has not or if unable to determine this.
*/
@Override
public boolean isLeaderElected(final String roleName) {
final String leaderAddress = determineLeaderExternal(roleName);
return !StringUtils.isEmpty(leaderAddress);
}
/**
* Use a new Curator client to determine which node is the elected leader for the given role.
*
* @param roleName the name of the role
* @return the id of the elected leader, or <code>null</code> if no leader has been selected or if unable to determine
* the leader from ZooKeeper
*/
private String determineLeaderExternal(final String roleName) {
final long start = System.nanoTime();
try (CuratorFramework client = createClient()) {
final LeaderSelectorListener electionListener = new LeaderSelectorListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
}
@Override
public void takeLeadership(CuratorFramework client) {
}
};
final String electionPath = getElectionPath(roleName);
// Note that we intentionally do not auto-requeue here, and we do not start the selector. We do not
// want to join the leader election. We simply want to observe.
final LeaderSelector selector = new LeaderSelector(client, electionPath, electionListener);
try {
final Participant leader = selector.getLeader();
return leader == null ? null : leader.getId();
} catch (final KeeperException.NoNodeException nne) {
// If there is no ZNode, then there is no elected leader.
return null;
} catch (final Exception e) {
logger.warn("Unable to determine the Elected Leader for role '{}' due to {}; assuming no leader has been elected", roleName, e.toString());
if (logger.isDebugEnabled()) {
logger.warn("", e);
}
return null;
}
} finally {
registerPollTime(System.nanoTime() - start);
}
}
private CuratorFramework createClient() {
// Create a new client because we don't want to try indefinitely for this to occur.
final RetryPolicy retryPolicy = new RetryNTimes(1, 100);
final CuratorACLProviderFactory aclProviderFactory = new CuratorACLProviderFactory();
final CuratorFrameworkFactory.Builder clientBuilder = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getConnectString())
.sessionTimeoutMs(zkConfig.getSessionTimeoutMillis())
.connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis())
.retryPolicy(retryPolicy)
.aclProvider(aclProviderFactory.create(zkConfig))
.defaultData(new byte[0]);
if (zkConfig.isClientSecure()) {
clientBuilder.zookeeperFactory(new SecureClientZooKeeperFactory(zkConfig));
}
final CuratorFramework client = clientBuilder.build();
client.start();
return client;
}
private static class LeaderRole {
private final LeaderSelector leaderSelector;
private final ElectionListener electionListener;
private final boolean participant;
public LeaderRole(final LeaderSelector leaderSelector, final ElectionListener electionListener, final boolean participant) {
this.leaderSelector = leaderSelector;
this.electionListener = electionListener;
this.participant = participant;
}
public LeaderSelector getLeaderSelector() {
return leaderSelector;
}
public ElectionListener getElectionListener() {
return electionListener;
}
public boolean isLeader() {
return electionListener.isLeader();
}
public boolean isParticipant() {
return participant;
}
}
private static class RegisteredRole {
private final LeaderElectionStateChangeListener listener;
private final String participantId;
public RegisteredRole(final String participantId, final LeaderElectionStateChangeListener listener) {
this.participantId = participantId;
this.listener = listener;
}
public LeaderElectionStateChangeListener getListener() {
return listener;
}
public String getParticipantId() {
return participantId;
}
}
private class ElectionListener extends LeaderSelectorListenerAdapter implements LeaderSelectorListener {
private final String roleName;
private final LeaderElectionStateChangeListener listener;
private final String participantId;
private volatile boolean leader;
private volatile Thread leaderThread;
private long leaderUpdateTimestamp = 0L;
private final long MAX_CACHE_MILLIS = TimeUnit.SECONDS.toMillis(5L);
public ElectionListener(final String roleName, final LeaderElectionStateChangeListener listener, final String participantId) {
this.roleName = roleName;
this.listener = listener;
this.participantId = participantId;
}
public void disable() {
logger.info("Election Listener for Role {} disabled", roleName);
setLeader(false);
if (leaderThread == null) {
logger.debug("Election Listener for Role {} disabled but there is no leader thread. Will not interrupt any threads.", roleName);
} else {
leaderThread.interrupt();
}
}
public synchronized boolean isLeader() {
if (leaderUpdateTimestamp < System.currentTimeMillis() - MAX_CACHE_MILLIS) {
try {
final long start = System.nanoTime();
final boolean zkLeader = verifyLeader();
final long nanos = System.nanoTime() - start;
setLeader(zkLeader);
logger.debug("Took {} nanoseconds to reach out to ZooKeeper in order to check whether or not this node is currently the leader for Role '{}'. ZooKeeper reported {}",
nanos, roleName, zkLeader);
} catch (final Exception e) {
logger.warn("Attempted to reach out to ZooKeeper to determine whether or not this node is the elected leader for Role '{}' but failed to communicate with ZooKeeper. " +
"Assuming that this node is not the leader.", roleName, e);
return false;
}
} else {
logger.debug("Checking if this node is leader for role {}: using cached response, returning {}", roleName, leader);
}
return leader;
}
private synchronized void setLeader(final boolean leader) {
this.leader = leader;
this.leaderUpdateTimestamp = System.currentTimeMillis();
}
@Override
public synchronized void stateChanged(final CuratorFramework client, final ConnectionState newState) {
logger.info("{} Connection State changed to {}", this, newState.name());
if (newState == ConnectionState.SUSPENDED || newState == ConnectionState.LOST) {
if (leader) {
logger.info("Because Connection State was changed to {}, will relinquish leadership for role '{}'", newState, roleName);
}
setLeader(false);
}
super.stateChanged(client, newState);
}
/**
* Reach out to ZooKeeper to verify that this node still is the leader. We do this because at times, a node will lose
* its position as leader but the Curator client will fail to notify us, perhaps due to network failure, etc.
*
* @return <code>true</code> if this node is still the elected leader according to ZooKeeper, false otherwise
*/
private boolean verifyLeader() {
final String leader = getLeader(roleName);
if (leader == null) {
logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}' but found that there is no leader.", roleName);
setLeader(false);
return false;
}
final boolean match = leader.equals(participantId);
logger.debug("Reached out to ZooKeeper to determine which node is the elected leader for Role '{}'. Elected Leader = '{}', Participant ID = '{}', This Node Elected = {}",
roleName, leader, participantId, match);
setLeader(match);
return match;
}
@Override
public void takeLeadership(final CuratorFramework client) throws Exception {
leaderThread = Thread.currentThread();
setLeader(true);
logger.info("{} This node has been elected Leader for Role '{}'", this, roleName);
if (listener != null) {
try {
listener.onLeaderElection();
} catch (final Exception e) {
logger.error("This node was elected Leader for Role '{}' but failed to take leadership. Will relinquish leadership role. Failure was due to: {}", roleName, e);
setLeader(false);
Thread.sleep(1000L);
return;
}
}
// Curator API states that we lose the leadership election when we return from this method,
// so we will block as long as we are not interrupted or closed. Then, we will set leader to false.
try {
int failureCount = 0;
int loopCount = 0;
while (!isStopped() && leader) {
try {
Thread.sleep(100L);
} catch (final InterruptedException ie) {
logger.info("{} has been interrupted; no longer leader for role '{}'", this, roleName);
Thread.currentThread().interrupt();
return;
}
if (leader && ++loopCount % 50 == 0) {
// While Curator is supposed to interrupt this thread when we are no longer the leader, we have occasionally
// seen occurrences where the thread does not get interrupted. As a result, we will reach out to ZooKeeper
// periodically to determine whether or not this node is still the elected leader.
try {
final boolean stillLeader = verifyLeader();
failureCount = 0; // we got a response, so we were successful in communicating with zookeeper. Set failureCount back to 0.
if (!stillLeader) {
logger.info("According to ZooKeeper, this node is no longer the leader for Role '{}'. Will relinquish leadership.", roleName);
break;
}
} catch (final Exception e) {
failureCount++;
if (failureCount > 1) {
logger.warn("Attempted to reach out to ZooKeeper to verify that this node still is the elected leader for Role '{}' "
+ "but failed to communicate with ZooKeeper. This is the second failed attempt, so will relinquish leadership of this role.", roleName, e);
} else {
logger.warn("Attempted to reach out to ZooKeeper to verify that this node still is the elected leader for Role '{}' "
+ "but failed to communicate with ZooKeeper. Will wait a bit and attempt to communicate with ZooKeeper again before relinquishing this role.", roleName, e);
}
}
}
}
} finally {
setLeader(false);
logger.info("{} This node is no longer leader for role '{}'", this, roleName);
if (listener != null) {
try {
listener.onLeaderRelinquish();
} catch (final Exception e) {
logger.error("This node is no longer leader for role '{}' but failed to shutdown leadership responsibilities properly due to: {}", roleName, e.toString());
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
}
}
}
public static class SecureClientZooKeeperFactory implements ZookeeperFactory {
public static final String NETTY_CLIENT_CNXN_SOCKET =
org.apache.zookeeper.ClientCnxnSocketNetty.class.getName();
private final ZKClientConfig zkSecureClientConfig;
public SecureClientZooKeeperFactory(final ZooKeeperClientConfig zkConfig) {
this.zkSecureClientConfig = new ZKClientConfig();
// Netty is required for the secure client config.
final String cnxnSocket = zkConfig.getConnectionSocket();
if (!NETTY_CLIENT_CNXN_SOCKET.equals(cnxnSocket)) {
throw new IllegalArgumentException(String.format("connection factory set to '%s', %s required", cnxnSocket, NETTY_CLIENT_CNXN_SOCKET));
}
zkSecureClientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, cnxnSocket);
// This should never happen but won't get checked elsewhere.
final boolean clientSecure = zkConfig.isClientSecure();
if (!clientSecure) {
throw new IllegalStateException(String.format("%s set to '%b', expected true", ZKClientConfig.SECURE_CLIENT, clientSecure));
}
zkSecureClientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, String.valueOf(clientSecure));
final X509Util clientX509util = new ClientX509Util();
zkSecureClientConfig.setProperty(clientX509util.getSslKeystoreLocationProperty(), zkConfig.getKeyStore());
zkSecureClientConfig.setProperty(clientX509util.getSslKeystoreTypeProperty(), zkConfig.getKeyStoreType());
zkSecureClientConfig.setProperty(clientX509util.getSslKeystorePasswdProperty(), zkConfig.getKeyStorePassword());
zkSecureClientConfig.setProperty(clientX509util.getSslTruststoreLocationProperty(), zkConfig.getTrustStore());
zkSecureClientConfig.setProperty(clientX509util.getSslTruststoreTypeProperty(), zkConfig.getTrustStoreType());
zkSecureClientConfig.setProperty(clientX509util.getSslTruststorePasswdProperty(), zkConfig.getTrustStorePassword());
zkSecureClientConfig.setProperty(ZKConfig.JUTE_MAXBUFFER, Integer.toString(zkConfig.getJuteMaxbuffer()));
}
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception {
return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, zkSecureClientConfig);
}
}
}