blob: 9171d9d6e48a0bf22f76f0cd2674393c8d4a4803 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.zk;
import com.google.common.annotations.VisibleForTesting;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import org.I0Itec.zkclient.IZkDataListener;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.LeaderElectorListener;
import org.apache.samza.coordinator.LeaderElector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
* An implementation of Leader Elector using Zookeeper.
*
* Each participant in the leader election process creates an instance of this class and tries to become the leader.
* The participant with the lowest sequence number in the ZK subtree for election becomes the leader. Every non-leader
* sets a watcher on its predecessor, where the predecessor is the participant with the largest sequence number
* that is less than the current participant's sequence number.
* </p>
* */
public class ZkLeaderElector implements LeaderElector {
public static final Logger LOG = LoggerFactory.getLogger(ZkLeaderElector.class);
private final ZkUtils zkUtils;
private final String processorIdStr;
private final ZkKeyBuilder keyBuilder;
private final String hostName;
private AtomicBoolean isLeader = new AtomicBoolean(false);
private IZkDataListener previousProcessorChangeListener;
private LeaderElectorListener leaderElectorListener = null;
private String currentSubscription = null;
private final Random random = new Random();
public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils) {
this.processorIdStr = processorIdStr;
this.zkUtils = zkUtils;
this.keyBuilder = zkUtils.getKeyBuilder();
this.hostName = getHostName();
this.previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);
zkUtils.validatePaths(new String[]{keyBuilder.getProcessorsPath()});
}
@VisibleForTesting
public ZkLeaderElector(String processorIdStr,
ZkUtils zkUtils,
IZkDataListener previousProcessorChangeListener) {
this.processorIdStr = processorIdStr;
this.zkUtils = zkUtils;
this.keyBuilder = zkUtils.getKeyBuilder();
this.hostName = getHostName();
this.previousProcessorChangeListener = previousProcessorChangeListener;
}
// TODO: This should go away once we integrate with Zk based Job Coordinator
private String getHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
LOG.error("Failed to fetch hostname of the processor", e);
throw new SamzaException(e);
}
}
/**
* Register a LeaderElectorListener
*
* @param listener {@link LeaderElectorListener} interfaces to be invoked upon completion of leader election participation
*/
@Override
public void setLeaderElectorListener(LeaderElectorListener listener) {
this.leaderElectorListener = listener;
}
/**
* Async method that helps the caller participate in leader election.
**/
@Override
public void tryBecomeLeader() {
String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
LOG.debug(zLog("Current active processors - " + children));
int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
LOG.info("tryBecomeLeader: index = " + index + " for path=" + currentPath + " out of " + Arrays.toString(children.toArray()));
if (children.size() == 0 || index == -1) {
throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
}
if (index == 0) {
isLeader.getAndSet(true);
LOG.info(zLog("Eligible to become the leader!"));
if (leaderElectorListener != null) {
leaderElectorListener.onBecomingLeader();
}
return;
}
isLeader.getAndSet(false);
LOG.info("Index = " + index + " Not eligible to be a leader yet!");
String predecessor = children.get(index - 1);
if (!predecessor.equals(currentSubscription)) {
if (currentSubscription != null) {
LOG.debug(zLog("Unsubscribing data change for " + currentSubscription));
zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
previousProcessorChangeListener);
previousProcessorChangeListener = new PreviousProcessorChangeListener(zkUtils);
}
currentSubscription = predecessor;
LOG.info(zLog("Subscribing data change for " + predecessor));
zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
previousProcessorChangeListener);
}
/**
* Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes
* on the path, even if the path doesn't exist. Since we are using Ephemeral Sequential nodes, if the path doesn't
* exist during subscription, it is not going to get created in the future.
*/
boolean predecessorExists = zkUtils.exists(keyBuilder.getProcessorsPath() + "/" + currentSubscription);
if (predecessorExists) {
LOG.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader."));
} else {
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
Thread.interrupted();
}
LOG.info(zLog("Predecessor doesn't exist anymore. Trying to become leader again..."));
tryBecomeLeader();
}
}
@Override
public void resignLeadership() {
isLeader.compareAndSet(true, false);
}
@Override
public boolean amILeader() {
return isLeader.get();
}
private String zLog(String logMessage) {
return String.format("[Processor-%s] %s", processorIdStr, logMessage);
}
class PreviousProcessorChangeListener extends ZkUtils.GenerationAwareZkDataListener {
public PreviousProcessorChangeListener(ZkUtils zkUtils) {
super(zkUtils, "PreviousProcessorChangeListener");
}
@Override
public void doHandleDataChange(String dataPath, Object data) {
LOG.info("Data change on path: {} for data: {}", dataPath, data);
}
@Override
public void doHandleDataDeleted(String dataPath) {
LOG.info(zLog("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader."));
tryBecomeLeader();
}
}
}