blob: 79a4f50c3cc7be8d702e52d8b5fa5e494f383478 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.meta;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.MultiCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Abstract ledger manager based on zookeeper, which provides common methods such as query zk nodes.
*/
public abstract class AbstractZkLedgerManager implements LedgerManager, Watcher {
private static final Logger LOG = LoggerFactory.getLogger(AbstractZkLedgerManager.class);
@VisibleForTesting
static final int ZK_CONNECT_BACKOFF_MS = 200;
private final LedgerMetadataSerDe serDe;
protected final AbstractConfiguration conf;
protected final ZooKeeper zk;
protected final String ledgerRootPath;
// ledger metadata listeners
protected final ConcurrentMap<Long, Set<LedgerMetadataListener>> listeners =
new ConcurrentHashMap<Long, Set<LedgerMetadataListener>>();
// we use this to prevent long stack chains from building up in callbacks
protected ScheduledExecutorService scheduler;
/**
* ReadLedgerMetadataTask class.
*/
protected class ReadLedgerMetadataTask implements Runnable {
final long ledgerId;
ReadLedgerMetadataTask(long ledgerId) {
this.ledgerId = ledgerId;
}
@Override
public void run() {
if (null != listeners.get(ledgerId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Re-read ledger metadata for {}.", ledgerId);
}
readLedgerMetadata(ledgerId, AbstractZkLedgerManager.this)
.whenComplete((metadata, exception) -> handleMetadata(metadata, exception));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ledger metadata listener for ledger {} is already removed.", ledgerId);
}
}
}
private void handleMetadata(Versioned<LedgerMetadata> result, Throwable exception) {
if (exception == null) {
final Set<LedgerMetadataListener> listenerSet = listeners.get(ledgerId);
if (null != listenerSet) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ledger metadata is changed for {} : {}.", ledgerId, result);
}
scheduler.submit(() -> {
synchronized (listenerSet) {
for (LedgerMetadataListener listener : listenerSet) {
listener.onChanged(ledgerId, result);
}
}
});
}
} else if (BKException.getExceptionCode(exception)
== BKException.Code.NoSuchLedgerExistsOnMetadataServerException) {
// the ledger is removed, do nothing
Set<LedgerMetadataListener> listenerSet = listeners.remove(ledgerId);
if (null != listenerSet) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removed ledger metadata listener set on ledger {} as its ledger is deleted : {}",
ledgerId, listenerSet.size());
}
// notify `null` as indicator that a ledger is deleted
// make this behavior consistent with `NodeDeleted` watched event.
synchronized (listenerSet) {
for (LedgerMetadataListener listener : listenerSet) {
listener.onChanged(ledgerId, null);
}
}
}
} else {
LOG.warn("Failed on read ledger metadata of ledger {}: {}",
ledgerId, BKException.getExceptionCode(exception));
scheduler.schedule(this, ZK_CONNECT_BACKOFF_MS, TimeUnit.MILLISECONDS);
}
}
}
/**
* ZooKeeper-based Ledger Manager Constructor.
*
* @param conf
* Configuration object
* @param zk
* ZooKeeper Client Handle
*/
protected AbstractZkLedgerManager(AbstractConfiguration conf, ZooKeeper zk) {
this.serDe = new LedgerMetadataSerDe();
this.conf = conf;
this.zk = zk;
this.ledgerRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
this.scheduler = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("ZkLedgerManagerScheduler"));
if (LOG.isDebugEnabled()) {
LOG.debug("Using AbstractZkLedgerManager with root path : {}", ledgerRootPath);
}
}
/**
* Get the znode path that is used to store ledger metadata.
*
* @param ledgerId
* Ledger ID
* @return ledger node path
*/
protected abstract String getLedgerPath(long ledgerId);
/**
* Get ledger id from its znode ledger path.
*
* @param ledgerPath
* Ledger path to store metadata
* @return ledger id
* @throws IOException when the ledger path is invalid
*/
protected abstract long getLedgerId(String ledgerPath) throws IOException;
@Override
public void process(WatchedEvent event) {
LOG.debug("Received watched event {} from zookeeper based ledger manager.", event);
if (Event.EventType.None == event.getType()) {
if (Event.KeeperState.Expired == event.getState()) {
LOG.info("ZooKeeper client expired on ledger manager.");
Set<Long> keySet = new HashSet<Long>(listeners.keySet());
for (Long lid : keySet) {
scheduler.submit(new ReadLedgerMetadataTask(lid));
LOG.info("Re-read ledger metadata for {} after zookeeper session expired.", lid);
}
}
return;
}
String path = event.getPath();
if (null == path) {
return;
}
final long ledgerId;
try {
ledgerId = getLedgerId(event.getPath());
} catch (IOException ioe) {
LOG.info("Received invalid ledger path {} : ", event.getPath(), ioe);
return;
}
switch (event.getType()) {
case NodeDeleted:
Set<LedgerMetadataListener> listenerSet = listeners.get(ledgerId);
if (null != listenerSet) {
synchronized (listenerSet){
if (LOG.isDebugEnabled()) {
LOG.debug("Removed ledger metadata listeners on ledger {} : {}",
ledgerId, listenerSet);
}
for (LedgerMetadataListener l : listenerSet) {
l.onChanged(ledgerId, null);
}
listeners.remove(ledgerId, listenerSet);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("No ledger metadata listeners to remove from ledger {} after it's deleted.", ledgerId);
}
}
break;
case NodeDataChanged:
new ReadLedgerMetadataTask(ledgerId).run();
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("Received event {} on {}.", event.getType(), event.getPath());
}
break;
}
}
@Override
public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long ledgerId,
LedgerMetadata inputMetadata) {
CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
/*
* Create a random number and use it as creator token.
*/
final long cToken = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
final LedgerMetadata metadata;
if (inputMetadata.getMetadataFormatVersion() > LedgerMetadataSerDe.METADATA_FORMAT_VERSION_2) {
metadata = LedgerMetadataBuilder.from(inputMetadata).withCToken(cToken).build();
} else {
metadata = inputMetadata;
}
String ledgerPath = getLedgerPath(ledgerId);
StringCallback scb = new StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (rc == Code.OK.intValue()) {
promise.complete(new Versioned<>(metadata, new LongVersion(0)));
} else if (rc == Code.NODEEXISTS.intValue()) {
LOG.info("Ledger metadata for {} appears to already exist, checking cToken",
ledgerId);
if (metadata.getMetadataFormatVersion() > 2) {
CompletableFuture<Versioned<LedgerMetadata>> readFuture = readLedgerMetadata(ledgerId);
readFuture.handle((readMetadata, exception) -> {
if (exception == null) {
if (readMetadata.getValue().getCToken() == cToken) {
FutureUtils.complete(promise, new Versioned<>(metadata, new LongVersion(0)));
} else {
LOG.warn("Failed to create ledger metadata for {} which already exists", ledgerId);
promise.completeExceptionally(new BKException.BKLedgerExistException());
}
} else if (exception instanceof KeeperException.NoNodeException) {
// This is a pretty strange case. We tried to create the node, found that it
// already exists, but failed to find it when we reread it. It's possible that
// we successfully created it, got an erroneous NODEEXISTS due to a resend,
// and then it got removed. It's also possible that we actually lost the race
// and then it got removed. I'd argue that returning an error here is the right
// path since recreating it is likely to cause problems.
LOG.warn("Ledger {} appears to have already existed and then been removed, failing"
+ " with LedgerExistException");
promise.completeExceptionally(new BKException.BKLedgerExistException());
} else {
LOG.error("Could not validate node for ledger {} after LedgerExistsException", ledgerId,
exception);
promise.completeExceptionally(new BKException.ZKException());
}
return null;
});
} else {
LOG.warn("Failed to create ledger metadata for {} which already exists", ledgerId);
promise.completeExceptionally(new BKException.BKLedgerExistException());
}
} else {
LOG.error("Could not create node for ledger {}", ledgerId,
KeeperException.create(Code.get(rc), path));
promise.completeExceptionally(new BKException.ZKException());
}
}
};
final byte[] data;
try {
data = serDe.serialize(metadata);
} catch (IOException ioe) {
promise.completeExceptionally(new BKException.BKMetadataSerializationException(ioe));
return promise;
}
List<ACL> zkAcls = ZkUtils.getACLs(conf);
ZkUtils.asyncCreateFullPathOptimistic(zk, ledgerPath, data, zkAcls,
CreateMode.PERSISTENT, scb, null);
return promise;
}
@Override
public CompletableFuture<Void> removeLedgerMetadata(final long ledgerId, final Version version) {
CompletableFuture<Void> promise = new CompletableFuture<>();
int znodeVersion = -1;
if (Version.NEW == version) {
LOG.error("Request to delete ledger {} metadata with version set to the initial one", ledgerId);
promise.completeExceptionally(new BKException.BKMetadataVersionException());
return promise;
} else if (Version.ANY != version) {
if (!(version instanceof LongVersion)) {
LOG.info("Not an instance of ZKVersion: {}", ledgerId);
promise.completeExceptionally(new BKException.BKMetadataVersionException());
return promise;
} else {
znodeVersion = (int) ((LongVersion) version).getLongVersion();
}
}
VoidCallback callbackForDelete = new VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
if (rc == KeeperException.Code.NONODE.intValue()) {
LOG.warn("Ledger node does not exist in ZooKeeper: ledgerId={}. Returning success.", ledgerId);
FutureUtils.complete(promise, null);
} else if (rc == KeeperException.Code.OK.intValue()) {
// removed listener on ledgerId
Set<LedgerMetadataListener> listenerSet = listeners.remove(ledgerId);
if (null != listenerSet) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Remove registered ledger metadata listeners on ledger {} after ledger is deleted.",
ledgerId, listenerSet);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("No ledger metadata listeners to remove from ledger {} when it's being deleted.",
ledgerId);
}
}
FutureUtils.complete(promise, null);
} else {
promise.completeExceptionally(new BKException.ZKException());
}
}
};
String ledgerZnodePath = getLedgerPath(ledgerId);
if (this instanceof HierarchicalLedgerManager || this instanceof LongHierarchicalLedgerManager) {
/*
* do recursive deletes only for HierarchicalLedgerManager and
* LongHierarchicalLedgerManager
*/
ZkUtils.asyncDeleteFullPathOptimistic(zk, ledgerZnodePath, znodeVersion, callbackForDelete,
ledgerZnodePath);
} else {
zk.delete(ledgerZnodePath, znodeVersion, callbackForDelete, null);
}
return promise;
}
@Override
public void registerLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener) {
if (null != listener) {
if (LOG.isDebugEnabled()) {
LOG.debug("Registered ledger metadata listener {} on ledger {}.", listener, ledgerId);
}
Set<LedgerMetadataListener> listenerSet = listeners.get(ledgerId);
if (listenerSet == null) {
Set<LedgerMetadataListener> newListenerSet = new HashSet<LedgerMetadataListener>();
Set<LedgerMetadataListener> oldListenerSet = listeners.putIfAbsent(ledgerId, newListenerSet);
if (null != oldListenerSet) {
listenerSet = oldListenerSet;
} else {
listenerSet = newListenerSet;
}
}
synchronized (listenerSet) {
listenerSet.add(listener);
}
new ReadLedgerMetadataTask(ledgerId).run();
}
}
@Override
public void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener) {
Set<LedgerMetadataListener> listenerSet = listeners.get(ledgerId);
if (listenerSet != null) {
synchronized (listenerSet) {
if (listenerSet.remove(listener)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unregistered ledger metadata listener {} on ledger {}.", listener, ledgerId);
}
}
if (listenerSet.isEmpty()) {
listeners.remove(ledgerId, listenerSet);
}
}
}
}
@Override
public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId) {
return readLedgerMetadata(ledgerId, null);
}
protected CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId, Watcher watcher) {
CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
zk.getData(getLedgerPath(ledgerId), watcher, new DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if (rc == KeeperException.Code.NONODE.intValue()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No such ledger: " + ledgerId,
KeeperException.create(KeeperException.Code.get(rc), path));
}
promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsOnMetadataServerException());
return;
}
if (rc != KeeperException.Code.OK.intValue()) {
LOG.error("Could not read metadata for ledger: " + ledgerId,
KeeperException.create(KeeperException.Code.get(rc), path));
promise.completeExceptionally(new BKException.ZKException());
return;
}
if (stat == null) {
LOG.error("Could not parse ledger metadata for ledger: {}. Stat object is null", ledgerId);
promise.completeExceptionally(new BKException.ZKException());
return;
}
try {
LongVersion version = new LongVersion(stat.getVersion());
LedgerMetadata metadata = serDe.parseConfig(data, Optional.of(stat.getCtime()));
promise.complete(new Versioned<>(metadata, version));
} catch (Throwable t) {
LOG.error("Could not parse ledger metadata for ledger: {}", ledgerId, t);
promise.completeExceptionally(new BKException.ZKException());
}
}
}, null);
return promise;
}
@Override
public CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(long ledgerId, LedgerMetadata metadata,
Version currentVersion) {
CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>();
if (!(currentVersion instanceof LongVersion)) {
promise.completeExceptionally(new BKException.BKMetadataVersionException());
return promise;
}
final LongVersion zv = (LongVersion) currentVersion;
final byte[] data;
try {
data = serDe.serialize(metadata);
} catch (IOException ioe) {
promise.completeExceptionally(new BKException.BKMetadataSerializationException(ioe));
return promise;
}
zk.setData(getLedgerPath(ledgerId),
data, (int) zv.getLongVersion(),
new StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (KeeperException.Code.BADVERSION.intValue() == rc) {
promise.completeExceptionally(new BKException.BKMetadataVersionException());
} else if (KeeperException.Code.OK.intValue() == rc) {
// update metadata version
promise.complete(new Versioned<>(metadata, new LongVersion(stat.getVersion())));
} else if (KeeperException.Code.NONODE.intValue() == rc) {
LOG.warn("Ledger node does not exist in ZooKeeper: ledgerId={}", ledgerId);
promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsOnMetadataServerException());
} else {
LOG.warn("Conditional update ledger metadata failed: {}", KeeperException.Code.get(rc));
promise.completeExceptionally(new BKException.ZKException());
}
}
}, null);
return promise;
}
/**
* Process ledgers in a single zk node.
*
* <p>
* for each ledger found in this zk node, processor#process(ledgerId) will be triggerred
* to process a specific ledger. after all ledgers has been processed, the finalCb will
* be called with provided context object. The RC passed to finalCb is decided by :
* <ul>
* <li> All ledgers are processed successfully, successRc will be passed.
* <li> Either ledger is processed failed, failureRc will be passed.
* </ul>
* </p>
*
* @param path
* Zk node path to store ledgers
* @param processor
* Processor provided to process ledger
* @param finalCb
* Callback object when all ledgers are processed
* @param ctx
* Context object passed to finalCb
* @param successRc
* RC passed to finalCb when all ledgers are processed successfully
* @param failureRc
* RC passed to finalCb when either ledger is processed failed
*/
protected void asyncProcessLedgersInSingleNode(
final String path, final Processor<Long> processor,
final AsyncCallback.VoidCallback finalCb, final Object ctx,
final int successRc, final int failureRc) {
ZkUtils.getChildrenInSingleNode(zk, path, new GenericCallback<List<String>>() {
@Override
public void operationComplete(int rc, List<String> ledgerNodes) {
if (Code.NONODE.intValue() == rc) {
finalCb.processResult(successRc, null, ctx);
return;
} else if (Code.OK.intValue() != rc) {
finalCb.processResult(failureRc, null, ctx);
return;
}
Set<Long> zkActiveLedgers = ledgerListToSet(ledgerNodes, path);
if (LOG.isDebugEnabled()) {
LOG.debug("Processing ledgers: {}", zkActiveLedgers);
}
// no ledgers found, return directly
if (zkActiveLedgers.size() == 0) {
finalCb.processResult(successRc, null, ctx);
return;
}
MultiCallback mcb = new MultiCallback(zkActiveLedgers.size(), finalCb, ctx,
successRc, failureRc);
// start loop over all ledgers
for (Long ledger : zkActiveLedgers) {
processor.process(ledger, mcb);
}
}
});
}
/**
* Whether the znode a special znode.
*
* @param znode
* Znode Name
* @return true if the znode is a special znode otherwise false
*/
public static boolean isSpecialZnode(String znode) {
if (BookKeeperConstants.AVAILABLE_NODE.equals(znode)
|| BookKeeperConstants.COOKIE_NODE.equals(znode)
|| BookKeeperConstants.LAYOUT_ZNODE.equals(znode)
|| BookKeeperConstants.INSTANCEID.equals(znode)
|| BookKeeperConstants.UNDER_REPLICATION_NODE.equals(znode)
|| LegacyHierarchicalLedgerManager.IDGEN_ZNODE.equals(znode)
|| LongHierarchicalLedgerManager.IDGEN_ZNODE.equals(znode)
|| znode.startsWith(ZkLedgerIdGenerator.LEDGER_ID_GEN_PREFIX)) {
return true;
}
return false;
}
/**
* regex expression for name of top level parent znode for ledgers (in
* HierarchicalLedgerManager) or znode of a ledger (in FlatLedgerManager).
*
* @return
*/
protected abstract String getLedgerParentNodeRegex();
/**
* whether the child of ledgersRootPath is a top level parent znode for
* ledgers (in HierarchicalLedgerManager) or znode of a ledger (in
* FlatLedgerManager).
*
* @param znode
* Znode Name
* @return
*/
public boolean isLedgerParentNode(String znode) {
return znode.matches(getLedgerParentNodeRegex());
}
/**
* Convert the ZK retrieved ledger nodes to a HashSet for easier comparisons.
*
* @param ledgerNodes
* zk ledger nodes
* @param path
* the prefix path of the ledger nodes
* @return ledger id hash set
*/
protected NavigableSet<Long> ledgerListToSet(List<String> ledgerNodes, String path) {
NavigableSet<Long> zkActiveLedgers = new TreeSet<Long>();
for (String ledgerNode : ledgerNodes) {
if (isSpecialZnode(ledgerNode)) {
continue;
}
try {
// convert the node path to ledger id according to different ledger manager implementation
zkActiveLedgers.add(getLedgerId(path + "/" + ledgerNode));
} catch (IOException e) {
LOG.warn("Error extracting ledgerId from ZK ledger node: " + ledgerNode);
// This is a pretty bad error as it indicates a ledger node in ZK
// has an incorrect format. For now just continue and consider
// this as a non-existent ledger.
continue;
}
}
return zkActiveLedgers;
}
@Override
public void close() {
try {
scheduler.shutdown();
} catch (Exception e) {
LOG.warn("Error when closing zookeeper based ledger manager: ", e);
}
}
}