blob: a50261a9ae71aa54c1497e95b9f7822d3dc15bad [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.bookkeeper.meta;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.DataFormats.CheckAllLedgersFormat;
import org.apache.bookkeeper.proto.DataFormats.LedgerRereplicationLayoutFormat;
import org.apache.bookkeeper.proto.DataFormats.LockDataFormat;
import org.apache.bookkeeper.proto.DataFormats.PlacementPolicyCheckFormat;
import org.apache.bookkeeper.proto.DataFormats.ReplicasCheckFormat;
import org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
import org.apache.bookkeeper.replication.ReplicationEnableCb;
import org.apache.bookkeeper.replication.ReplicationException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.SubTreeCache;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* ZooKeeper implementation of underreplication manager.
* This is implemented in a heirarchical fashion, so it'll work with
* FlatLedgerManagerFactory and HierarchicalLedgerManagerFactory.
* <p>Layout is:
* /root/underreplication/ LAYOUT
* ledgers/(hierarchicalpath)/urL(ledgerId)
* locks/(ledgerId)
* <p>The hierarchical path is created by splitting the ledger into 4 2byte
* segments which are represented in hexidecimal.
* e.g. For ledger id 0xcafebeef0000feed, the path is
* cafe/beef/0000/feed/
public class ZkLedgerUnderreplicationManager implements LedgerUnderreplicationManager {
static final Logger LOG = LoggerFactory.getLogger(ZkLedgerUnderreplicationManager.class);
static final String LAYOUT = "BASIC";
static final int LAYOUT_VERSION = 1;
private static final byte[] LOCK_DATA = getLockData();
private static class Lock {
private final String lockZNode;
private final int ledgerZNodeVersion;
Lock(String lockZNode, int ledgerZNodeVersion) {
this.lockZNode = lockZNode;
this.ledgerZNodeVersion = ledgerZNodeVersion;
String getLockZNode() {
return lockZNode;
int getLedgerZNodeVersion() {
return ledgerZNodeVersion;
private final Map<Long, Lock> heldLocks = new ConcurrentHashMap<Long, Lock>();
private final Pattern idExtractionPattern;
private final String basePath;
private final String urLedgerPath;
private final String urLockPath;
private final String layoutZNode;
private final AbstractConfiguration conf;
private final String lostBookieRecoveryDelayZnode;
private final String checkAllLedgersCtimeZnode;
private final String placementPolicyCheckCtimeZnode;
private final String replicasCheckCtimeZnode;
private final ZooKeeper zkc;
private final SubTreeCache subTreeCache;
public ZkLedgerUnderreplicationManager(AbstractConfiguration conf, ZooKeeper zkc)
throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
this.conf = conf;
basePath = getBasePath(ZKMetadataDriverBase.resolveZkLedgersRootPath(conf));
layoutZNode = basePath + '/' + BookKeeperConstants.LAYOUT_ZNODE;
urLedgerPath = basePath
urLockPath = basePath + '/' + BookKeeperConstants.UNDER_REPLICATION_LOCK;
lostBookieRecoveryDelayZnode = basePath + '/' + BookKeeperConstants.LOSTBOOKIERECOVERYDELAY_NODE;
checkAllLedgersCtimeZnode = basePath + '/' + BookKeeperConstants.CHECK_ALL_LEDGERS_CTIME;
placementPolicyCheckCtimeZnode = basePath + '/' + BookKeeperConstants.PLACEMENT_POLICY_CHECK_CTIME;
replicasCheckCtimeZnode = basePath + '/' + BookKeeperConstants.REPLICAS_CHECK_CTIME;
idExtractionPattern = Pattern.compile("urL(\\d+)$");
this.zkc = zkc;
this.subTreeCache = new SubTreeCache(new SubTreeCache.TreeProvider() {
public List<String> getChildren(String path, Watcher watcher) throws InterruptedException, KeeperException {
return zkc.getChildren(path, watcher);
public static String getBasePath(String rootPath) {
return String.format("%s/%s", rootPath, BookKeeperConstants.UNDER_REPLICATION_NODE);
public static String getUrLockPath(String rootPath) {
return String.format("%s/%s", getBasePath(rootPath), BookKeeperConstants.UNDER_REPLICATION_LOCK);
public static byte[] getLockData() {
LockDataFormat.Builder lockDataBuilder = LockDataFormat.newBuilder();
try {
} catch (UnknownHostException uhe) {
// if we cant get the address, ignore. it's optional
// in the data structure in any case
return TextFormat.printToString(;
private void checkLayout()
throws KeeperException, InterruptedException, ReplicationException.CompatibilityException {
List<ACL> zkAcls = ZkUtils.getACLs(conf);
if (zkc.exists(basePath, false) == null) {
try {
zkc.create(basePath, new byte[0], zkAcls, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException nee) {
// do nothing, someone each could have created it
while (true) {
if (zkc.exists(layoutZNode, false) == null) {
LedgerRereplicationLayoutFormat.Builder builder = LedgerRereplicationLayoutFormat.newBuilder();
try {
zkc.create(layoutZNode, TextFormat.printToString(,
zkAcls, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException nne) {
// someone else managed to create it
} else {
byte[] layoutData = zkc.getData(layoutZNode, false, null);
LedgerRereplicationLayoutFormat.Builder builder = LedgerRereplicationLayoutFormat.newBuilder();
try {
TextFormat.merge(new String(layoutData, UTF_8), builder);
LedgerRereplicationLayoutFormat layout =;
if (!layout.getType().equals(LAYOUT)
|| layout.getVersion() != LAYOUT_VERSION) {
throw new ReplicationException.CompatibilityException(
"Incompatible layout found (" + LAYOUT + ":" + LAYOUT_VERSION + ")");
} catch (TextFormat.ParseException pe) {
throw new ReplicationException.CompatibilityException(
"Invalid data found", pe);
if (zkc.exists(urLedgerPath, false) == null) {
try {
zkc.create(urLedgerPath, new byte[0], zkAcls, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException nee) {
// do nothing, someone each could have created it
if (zkc.exists(urLockPath, false) == null) {
try {
zkc.create(urLockPath, new byte[0], zkAcls, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException nee) {
// do nothing, someone each could have created it
private long getLedgerId(String path) throws NumberFormatException {
Matcher m = idExtractionPattern.matcher(path);
if (m.find()) {
return Long.parseLong(;
} else {
throw new NumberFormatException("Couldn't find ledgerid in path");
public static String getParentZnodePath(String base, long ledgerId) {
String subdir1 = String.format("%04x", ledgerId >> 48 & 0xffff);
String subdir2 = String.format("%04x", ledgerId >> 32 & 0xffff);
String subdir3 = String.format("%04x", ledgerId >> 16 & 0xffff);
String subdir4 = String.format("%04x", ledgerId & 0xffff);
return String.format("%s/%s/%s/%s/%s",
base, subdir1, subdir2, subdir3, subdir4);
public static String getUrLedgerZnode(String base, long ledgerId) {
return String.format("%s/urL%010d", getParentZnodePath(base, ledgerId), ledgerId);
public static String getUrLedgerLockZnode(String base, long ledgerId) {
return String.format("%s/urL%010d", base, ledgerId);
private String getUrLedgerZnode(long ledgerId) {
return getUrLedgerZnode(urLedgerPath, ledgerId);
public UnderreplicatedLedger getLedgerUnreplicationInfo(long ledgerId)
throws ReplicationException.UnavailableException {
try {
String znode = getUrLedgerZnode(ledgerId);
UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
byte[] data = null;
try {
data = zkc.getData(znode, false, null);
} catch (KeeperException.NoNodeException nne) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ledger: {} is not marked underreplicated", ledgerId);
return null;
TextFormat.merge(new String(data, UTF_8), builder);
UnderreplicatedLedgerFormat underreplicatedLedgerFormat =;
UnderreplicatedLedger underreplicatedLedger = new UnderreplicatedLedger(ledgerId);
List<String> replicaList = underreplicatedLedgerFormat.getReplicaList();
long ctime = (underreplicatedLedgerFormat.hasCtime() ? underreplicatedLedgerFormat.getCtime()
: UnderreplicatedLedger.UNASSIGNED_CTIME);
return underreplicatedLedger;
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
} catch (TextFormat.ParseException pe) {
throw new ReplicationException.UnavailableException("Error parsing proto message", pe);
public CompletableFuture<Void> markLedgerUnderreplicatedAsync(long ledgerId, Collection<String> missingReplicas) {
if (LOG.isDebugEnabled()) {
LOG.debug("markLedgerUnderreplicated(ledgerId={}, missingReplica={})", ledgerId, missingReplicas);
final List<ACL> zkAcls = ZkUtils.getACLs(conf);
final String znode = getUrLedgerZnode(ledgerId);
final CompletableFuture<Void> createFuture = new CompletableFuture<>();
tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, createFuture);
return createFuture;
private void tryMarkLedgerUnderreplicatedAsync(final String znode,
final Collection<String> missingReplicas,
final List<ACL> zkAcls,
final CompletableFuture<Void> finalFuture) {
final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
if (conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
final byte[] urLedgerData = TextFormat.printToString(;
zkc, znode, urLedgerData, zkAcls, CreateMode.PERSISTENT,
(rc, path, ctx, name) -> {
if (Code.OK.intValue() == rc) {
FutureUtils.complete(finalFuture, null);
} else if (Code.NODEEXISTS.intValue() == rc) {
// we need to handle the case where the ledger has been marked as underreplicated
handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);
} else {
FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(rc)));
}, null);
private void handleLedgerUnderreplicatedAlreadyMarked(final String znode,
final Collection<String> missingReplicas,
final List<ACL> zkAcls,
final CompletableFuture<Void> finalFuture) {
// get the existing underreplicated ledger data
zkc.getData(znode, false, (getRc, getPath, getCtx, existingUrLedgerData, getStat) -> {
if (Code.OK.intValue() == getRc) {
// deserialize existing underreplicated ledger data
final UnderreplicatedLedgerFormat.Builder builder = UnderreplicatedLedgerFormat.newBuilder();
try {
TextFormat.merge(new String(existingUrLedgerData, UTF_8), builder);
} catch (ParseException e) {
// corrupted metadata in zookeeper
new ReplicationException.UnavailableException(
"Invalid underreplicated ledger data for ledger " + znode, e));
UnderreplicatedLedgerFormat existingUrLedgerFormat =;
boolean replicaAdded = false;
for (String missingReplica : missingReplicas) {
if (existingUrLedgerFormat.getReplicaList().contains(missingReplica)) {
} else {
replicaAdded = true;
if (!replicaAdded) { // no new missing replica is added
FutureUtils.complete(finalFuture, null);
if (conf.getStoreSystemTimeAsLedgerUnderreplicatedMarkTime()) {
final byte[] newUrLedgerData = TextFormat.printToString(;
zkc.setData(znode, newUrLedgerData, getStat.getVersion(), (setRc, setPath, setCtx, setStat) -> {
if (Code.OK.intValue() == setRc) {
FutureUtils.complete(finalFuture, null);
} else if (Code.NONODE.intValue() == setRc) {
tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);
} else if (Code.BADVERSION.intValue() == setRc) {
handleLedgerUnderreplicatedAlreadyMarked(znode, missingReplicas, zkAcls, finalFuture);
} else {
FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(setRc)));
}, null);
} else if (Code.NONODE.intValue() == getRc) {
tryMarkLedgerUnderreplicatedAsync(znode, missingReplicas, zkAcls, finalFuture);
} else {
FutureUtils.completeExceptionally(finalFuture, KeeperException.create(Code.get(getRc)));
}, null);
public void markLedgerReplicated(long ledgerId) throws ReplicationException.UnavailableException {
if (LOG.isDebugEnabled()) {
LOG.debug("markLedgerReplicated(ledgerId={})", ledgerId);
try {
Lock l = heldLocks.get(ledgerId);
if (l != null) {
zkc.delete(getUrLedgerZnode(ledgerId), l.getLedgerZNodeVersion());
try {
// clean up the hierarchy
String[] parts = getUrLedgerZnode(ledgerId).split("/");
for (int i = 1; i <= 4; i++) {
String[] p = Arrays.copyOf(parts, parts.length - i);
String path = Joiner.on("/").join(p);
Stat s = zkc.exists(path, null);
if (s != null) {
zkc.delete(path, s.getVersion());
} catch (KeeperException.NotEmptyException nee) {
// This can happen when cleaning up the hierarchy.
// It's safe to ignore, it simply means another
// ledger in the same hierarchy has been marked as
// underreplicated.
} catch (KeeperException.NoNodeException nne) {
// this is ok
} catch (KeeperException.BadVersionException bve) {
// if this is the case, some has marked the ledger
// for rereplication again. Leave the underreplicated
// znode in place, so the ledger is checked.
} catch (KeeperException ke) {
LOG.error("Error deleting underreplicated ledger znode", ke);
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
} finally {
* Get a list of all the underreplicated ledgers which have been
* marked for rereplication, filtered by the predicate on the replicas list.
* <p>Replicas list of an underreplicated ledger is the list of the bookies which are part of
* the ensemble of this ledger and are currently unavailable/down.
* @param predicate filter to use while listing under replicated ledgers. 'null' if filtering is not required.
* @return an iterator which returns underreplicated ledgers.
public Iterator<UnderreplicatedLedger> listLedgersToRereplicate(final Predicate<List<String>> predicate) {
final Queue<String> queue = new LinkedList<String>();
return new Iterator<UnderreplicatedLedger>() {
final Queue<UnderreplicatedLedger> curBatch = new LinkedList<UnderreplicatedLedger>();
public void remove() {
throw new UnsupportedOperationException();
public boolean hasNext() {
if (curBatch.size() > 0) {
return true;
while (queue.size() > 0 && curBatch.size() == 0) {
String parent = queue.remove();
try {
for (String c : zkc.getChildren(parent, false)) {
String child = parent + "/" + c;
if (c.startsWith("urL")) {
long ledgerId = getLedgerId(child);
UnderreplicatedLedger underreplicatedLedger = getLedgerUnreplicationInfo(ledgerId);
if (underreplicatedLedger != null) {
List<String> replicaList = underreplicatedLedger.getReplicaList();
if ((predicate == null) || predicate.test(replicaList)) {
} else {
} catch (InterruptedException ie) {
return false;
} catch (KeeperException.NoNodeException nne) {
// ignore
} catch (Exception e) {
throw new RuntimeException("Error reading list", e);
return curBatch.size() > 0;
public UnderreplicatedLedger next() {
assert curBatch.size() > 0;
return curBatch.remove();
private long getLedgerToRereplicateFromHierarchy(String parent, long depth)
throws KeeperException, InterruptedException {
if (depth == 4) {
List<String> children;
try {
children = subTreeCache.getChildren(parent);
} catch (KeeperException.NoNodeException nne) {
// can occur if another underreplicated ledger's
// hierarchy is being cleaned up
return -1;
List<ACL> zkAcls = ZkUtils.getACLs(conf);
while (children.size() > 0) {
String tryChild = children.get(0);
try {
List<String> locks = subTreeCache.getChildren(urLockPath);
if (locks.contains(tryChild)) {
Stat stat = zkc.exists(parent + "/" + tryChild, false);
if (stat == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("{}/{} doesn't exist", parent, tryChild);
String lockPath = urLockPath + "/" + tryChild;
long ledgerId = getLedgerId(tryChild);
zkc.create(lockPath, LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
heldLocks.put(ledgerId, new Lock(lockPath, stat.getVersion()));
return ledgerId;
} catch (KeeperException.NodeExistsException nee) {
} catch (NumberFormatException nfe) {
return -1;
List<String> children;
try {
children = subTreeCache.getChildren(parent);
} catch (KeeperException.NoNodeException nne) {
// can occur if another underreplicated ledger's
// hierarchy is being cleaned up
return -1;
while (children.size() > 0) {
String tryChild = children.get(0);
String tryPath = parent + "/" + tryChild;
long ledger = getLedgerToRereplicateFromHierarchy(tryPath, depth + 1);
if (ledger != -1) {
return ledger;
return -1;
public long pollLedgerToRereplicate() throws ReplicationException.UnavailableException {
if (LOG.isDebugEnabled()) {
try {
return getLedgerToRereplicateFromHierarchy(urLedgerPath, 0);
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
public long getLedgerToRereplicate() throws ReplicationException.UnavailableException {
if (LOG.isDebugEnabled()) {
while (true) {
final CountDownLatch changedLatch = new CountDownLatch(1);
Watcher w = new Watcher() {
public void process(WatchedEvent e) {"Latch countdown due to ZK event: " + e);
try (SubTreeCache.WatchGuard wg = subTreeCache.registerWatcherWithGuard(w)) {
long ledger = getLedgerToRereplicateFromHierarchy(urLedgerPath, 0);
if (ledger != -1) {
return ledger;
// nothing found, wait for a watcher to trigger
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
private void waitIfLedgerReplicationDisabled() throws UnavailableException,
InterruptedException {
ReplicationEnableCb cb = new ReplicationEnableCb();
if (!this.isLedgerReplicationEnabled()) {
public void releaseUnderreplicatedLedger(long ledgerId) throws ReplicationException.UnavailableException {
if (LOG.isDebugEnabled()) {
LOG.debug("releaseLedger(ledgerId={})", ledgerId);
try {
Lock l = heldLocks.get(ledgerId);
if (l != null) {
zkc.delete(l.getLockZNode(), -1);
} catch (KeeperException.NoNodeException nne) {
// this is ok
} catch (KeeperException ke) {
LOG.error("Error deleting underreplicated ledger lock", ke);
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
public void close() throws ReplicationException.UnavailableException {
if (LOG.isDebugEnabled()) {
try {
for (Map.Entry<Long, Lock> e : heldLocks.entrySet()) {
zkc.delete(e.getValue().getLockZNode(), -1);
} catch (KeeperException.NoNodeException nne) {
// this is ok
} catch (KeeperException ke) {
LOG.error("Error deleting underreplicated ledger lock", ke);
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while connecting zookeeper", ie);
public void disableLedgerReplication()
throws ReplicationException.UnavailableException {
List<ACL> zkAcls = ZkUtils.getACLs(conf);
if (LOG.isDebugEnabled()) {
try {
String znode = basePath + '/' + BookKeeperConstants.DISABLE_NODE;
zkc.create(znode, "".getBytes(UTF_8), zkAcls, CreateMode.PERSISTENT);"Auto ledger re-replication is disabled!");
} catch (KeeperException.NodeExistsException ke) {
LOG.warn("AutoRecovery is already disabled!", ke);
throw new ReplicationException.UnavailableException(
"AutoRecovery is already disabled!", ke);
} catch (KeeperException ke) {
LOG.error("Exception while stopping auto ledger re-replication", ke);
throw new ReplicationException.UnavailableException(
"Exception while stopping auto ledger re-replication", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException(
"Interrupted while stopping auto ledger re-replication", ie);
public void enableLedgerReplication()
throws ReplicationException.UnavailableException {
if (LOG.isDebugEnabled()) {
try {
zkc.delete(basePath + '/' + BookKeeperConstants.DISABLE_NODE, -1);"Resuming automatic ledger re-replication");
} catch (KeeperException.NoNodeException ke) {
LOG.warn("AutoRecovery is already enabled!", ke);
throw new ReplicationException.UnavailableException(
"AutoRecovery is already enabled!", ke);
} catch (KeeperException ke) {
LOG.error("Exception while resuming ledger replication", ke);
throw new ReplicationException.UnavailableException(
"Exception while resuming auto ledger re-replication", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException(
"Interrupted while resuming auto ledger re-replication", ie);
public boolean isLedgerReplicationEnabled()
throws ReplicationException.UnavailableException {
if (LOG.isDebugEnabled()) {
try {
return null == zkc.exists(basePath + '/'
+ BookKeeperConstants.DISABLE_NODE, false);
} catch (KeeperException ke) {
LOG.error("Error while checking the state of "
+ "ledger re-replication", ke);
throw new ReplicationException.UnavailableException(
"Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException(
"Interrupted while contacting zookeeper", ie);
public void notifyLedgerReplicationEnabled(final GenericCallback<Void> cb)
throws ReplicationException.UnavailableException {
if (LOG.isDebugEnabled()) {
Watcher w = new Watcher() {
public void process(WatchedEvent e) {
if (e.getType() == Watcher.Event.EventType.NodeDeleted) {"LedgerReplication is enabled externally through Zookeeper, "
+ "since DISABLE_NODE ZNode is deleted");
cb.operationComplete(0, null);
try {
if (null == zkc.exists(basePath + '/'
+ BookKeeperConstants.DISABLE_NODE, w)) {"LedgerReplication is enabled externally through Zookeeper, "
+ "since DISABLE_NODE ZNode is deleted");
cb.operationComplete(0, null);
} catch (KeeperException ke) {
LOG.error("Error while checking the state of "
+ "ledger re-replication", ke);
throw new ReplicationException.UnavailableException(
"Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException(
"Interrupted while contacting zookeeper", ie);
* Check whether the ledger is being replicated by any bookie.
public static boolean isLedgerBeingReplicated(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId)
throws KeeperException,
InterruptedException {
return zkc.exists(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), false) != null;
* Acquire the underreplicated ledger lock.
public static void acquireUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath,
long ledgerId, List<ACL> zkAcls)
throws KeeperException, InterruptedException {
ZkUtils.createFullPathOptimistic(zkc, getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId),
LOCK_DATA, zkAcls, CreateMode.EPHEMERAL);
* Release the underreplicated ledger lock if it exists.
public static void releaseUnderreplicatedLedgerLock(ZooKeeper zkc, String zkLedgersRootPath, long ledgerId)
throws InterruptedException, KeeperException {
if (isLedgerBeingReplicated(zkc, zkLedgersRootPath, ledgerId)) {
zkc.delete(getUrLedgerLockZnode(getUrLockPath(zkLedgersRootPath), ledgerId), -1);
public boolean initializeLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws UnavailableException {
try {
zkc.create(lostBookieRecoveryDelayZnode, Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8),
} catch (KeeperException.NodeExistsException ke) {"lostBookieRecoveryDelay Znode is already present, so using "
+ "existing lostBookieRecoveryDelay Znode value");
return false;
} catch (KeeperException.NoNodeException nne) {
LOG.error("lostBookieRecoveryDelay Znode not found. Please verify if Auditor has been initialized.", nne);
return false;
} catch (KeeperException ke) {
LOG.error("Error while initializing LostBookieRecoveryDelay", ke);
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
return true;
public void setLostBookieRecoveryDelay(int lostBookieRecoveryDelay) throws UnavailableException {
try {
if (zkc.exists(lostBookieRecoveryDelayZnode, false) != null) {
zkc.setData(lostBookieRecoveryDelayZnode, Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8),
} else {
zkc.create(lostBookieRecoveryDelayZnode, Integer.toString(lostBookieRecoveryDelay).getBytes(UTF_8),
} catch (KeeperException ke) {
LOG.error("Error while setting LostBookieRecoveryDelay ", ke);
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
public int getLostBookieRecoveryDelay() throws UnavailableException {
try {
byte[] data = zkc.getData(lostBookieRecoveryDelayZnode, false, null);
return Integer.parseInt(new String(data, UTF_8));
} catch (KeeperException ke) {
LOG.error("Error while getting LostBookieRecoveryDelay ", ke);
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
public void notifyLostBookieRecoveryDelayChanged(GenericCallback<Void> cb) throws UnavailableException {
Watcher w = new Watcher() {
public void process(WatchedEvent e) {
if (e.getType() == Watcher.Event.EventType.NodeDataChanged) {
cb.operationComplete(0, null);
try {
if (null == zkc.exists(lostBookieRecoveryDelayZnode, w)) {
cb.operationComplete(0, null);
} catch (KeeperException ke) {
LOG.error("Error while checking the state of lostBookieRecoveryDelay", ke);
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
public String getReplicationWorkerIdRereplicatingLedger(long ledgerId)
throws ReplicationException.UnavailableException {
String replicationWorkerId = null;
try {
byte[] lockData = zkc.getData(getUrLedgerLockZnode(urLockPath, ledgerId), false, null);
LockDataFormat.Builder lockDataBuilder = LockDataFormat.newBuilder();
TextFormat.merge(new String(lockData, UTF_8), lockDataBuilder);
LockDataFormat lock =;
replicationWorkerId = lock.getBookieId();
} catch (KeeperException.NoNodeException e) {
// this is ok.
} catch (KeeperException e) {
LOG.error("Error while getting ReplicationWorkerId rereplicating Ledger", e);
throw new ReplicationException.UnavailableException(
"Error while getting ReplicationWorkerId rereplicating Ledger", e);
} catch (InterruptedException e) {
LOG.error("Got interrupted while getting ReplicationWorkerId rereplicating Ledger", e);
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", e);
} catch (ParseException e) {
LOG.error("Error while parsing ZK data of lock", e);
throw new ReplicationException.UnavailableException("Error while parsing ZK data of lock", e);
return replicationWorkerId;
public void setCheckAllLedgersCTime(long checkAllLedgersCTime) throws UnavailableException {
if (LOG.isDebugEnabled()) {
try {
List<ACL> zkAcls = ZkUtils.getACLs(conf);
CheckAllLedgersFormat.Builder builder = CheckAllLedgersFormat.newBuilder();
byte[] checkAllLedgersFormatByteArray =;
if (zkc.exists(checkAllLedgersCtimeZnode, false) != null) {
zkc.setData(checkAllLedgersCtimeZnode, checkAllLedgersFormatByteArray, -1);
} else {
zkc.create(checkAllLedgersCtimeZnode, checkAllLedgersFormatByteArray, zkAcls, CreateMode.PERSISTENT);
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
public long getCheckAllLedgersCTime() throws UnavailableException {
if (LOG.isDebugEnabled()) {
try {
byte[] data = zkc.getData(checkAllLedgersCtimeZnode, false, null);
CheckAllLedgersFormat checkAllLedgersFormat = CheckAllLedgersFormat.parseFrom(data);
return checkAllLedgersFormat.hasCheckAllLedgersCTime() ? checkAllLedgersFormat.getCheckAllLedgersCTime()
: -1;
} catch (KeeperException.NoNodeException ne) {
LOG.warn("checkAllLedgersCtimeZnode is not yet available");
return -1;
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
} catch (InvalidProtocolBufferException ipbe) {
throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe);
public void setPlacementPolicyCheckCTime(long placementPolicyCheckCTime) throws UnavailableException {
if (LOG.isDebugEnabled()) {
try {
List<ACL> zkAcls = ZkUtils.getACLs(conf);
PlacementPolicyCheckFormat.Builder builder = PlacementPolicyCheckFormat.newBuilder();
byte[] placementPolicyCheckFormatByteArray =;
if (zkc.exists(placementPolicyCheckCtimeZnode, false) != null) {
zkc.setData(placementPolicyCheckCtimeZnode, placementPolicyCheckFormatByteArray, -1);
} else {
zkc.create(placementPolicyCheckCtimeZnode, placementPolicyCheckFormatByteArray, zkAcls,
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
public long getPlacementPolicyCheckCTime() throws UnavailableException {
if (LOG.isDebugEnabled()) {
try {
byte[] data = zkc.getData(placementPolicyCheckCtimeZnode, false, null);
PlacementPolicyCheckFormat placementPolicyCheckFormat = PlacementPolicyCheckFormat.parseFrom(data);
return placementPolicyCheckFormat.hasPlacementPolicyCheckCTime()
? placementPolicyCheckFormat.getPlacementPolicyCheckCTime() : -1;
} catch (KeeperException.NoNodeException ne) {
LOG.warn("placementPolicyCheckCtimeZnode is not yet available");
return -1;
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
} catch (InvalidProtocolBufferException ipbe) {
throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe);
public void setReplicasCheckCTime(long replicasCheckCTime) throws UnavailableException {
try {
List<ACL> zkAcls = ZkUtils.getACLs(conf);
ReplicasCheckFormat.Builder builder = ReplicasCheckFormat.newBuilder();
byte[] replicasCheckFormatByteArray =;
if (zkc.exists(replicasCheckCtimeZnode, false) != null) {
zkc.setData(replicasCheckCtimeZnode, replicasCheckFormatByteArray, -1);
} else {
zkc.create(replicasCheckCtimeZnode, replicasCheckFormatByteArray, zkAcls, CreateMode.PERSISTENT);
if (LOG.isDebugEnabled()) {
LOG.debug("setReplicasCheckCTime completed successfully");
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
public long getReplicasCheckCTime() throws UnavailableException {
try {
byte[] data = zkc.getData(replicasCheckCtimeZnode, false, null);
ReplicasCheckFormat replicasCheckFormat = ReplicasCheckFormat.parseFrom(data);
if (LOG.isDebugEnabled()) {
LOG.debug("getReplicasCheckCTime completed successfully");
return replicasCheckFormat.hasReplicasCheckCTime() ? replicasCheckFormat.getReplicasCheckCTime() : -1;
} catch (KeeperException.NoNodeException ne) {
LOG.warn("replicasCheckCtimeZnode is not yet available");
return -1;
} catch (KeeperException ke) {
throw new ReplicationException.UnavailableException("Error contacting zookeeper", ke);
} catch (InterruptedException ie) {
throw new ReplicationException.UnavailableException("Interrupted while contacting zookeeper", ie);
} catch (InvalidProtocolBufferException ipbe) {
throw new ReplicationException.UnavailableException("Error while parsing ZK protobuf binary data", ipbe);