blob: 951bb9581ca2e2cb7dd2d91d6ef8493851c8f79d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.discover;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.EMPTY_BYTE_ARRAY;
import static org.apache.bookkeeper.util.BookKeeperConstants.INSTANCEID;
import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException;
import org.apache.bookkeeper.bookie.BookieException.CookieExistException;
import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException;
import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BKException.BKInterruptedException;
import org.apache.bookkeeper.client.BKException.MetaStoreException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory;
import org.apache.bookkeeper.meta.LayoutManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.ZkLayoutManager;
import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
/**
* ZooKeeper Based {@link RegistrationManager}.
*/
@Slf4j
public class ZKRegistrationManager implements RegistrationManager {
private static final Function<Throwable, BKException> EXCEPTION_FUNC = cause -> {
if (cause instanceof BKException) {
log.error("Failed to get bookie list : ", cause);
return (BKException) cause;
} else if (cause instanceof InterruptedException) {
log.error("Interrupted reading bookie list : ", cause);
return new BKInterruptedException();
} else {
return new MetaStoreException();
}
};
private final ServerConfiguration conf;
private final ZooKeeper zk;
private final List<ACL> zkAcls;
private final LayoutManager layoutManager;
private volatile boolean zkRegManagerInitialized = false;
// ledgers root path
private final String ledgersRootPath;
// cookie path
private final String cookiePath;
// registration paths
protected final String bookieRegistrationPath;
protected final String bookieReadonlyRegistrationPath;
// session timeout in milliseconds
private final int zkTimeoutMs;
public ZKRegistrationManager(ServerConfiguration conf,
ZooKeeper zk,
RegistrationListener listener) {
this(conf, zk, ZKMetadataDriverBase.resolveZkLedgersRootPath(conf), listener);
}
public ZKRegistrationManager(ServerConfiguration conf,
ZooKeeper zk,
String ledgersRootPath,
RegistrationListener listener) {
this.conf = conf;
this.zk = zk;
this.zkAcls = ZkUtils.getACLs(conf);
this.ledgersRootPath = ledgersRootPath;
this.cookiePath = ledgersRootPath + "/" + COOKIE_NODE;
this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE;
this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY;
this.zkTimeoutMs = conf.getZkTimeout();
this.layoutManager = new ZkLayoutManager(
zk,
ledgersRootPath,
zkAcls);
this.zk.register(event -> {
if (!zkRegManagerInitialized) {
// do nothing until first registration
return;
}
// Check for expired connection.
if (event.getType().equals(EventType.None)
&& event.getState().equals(KeeperState.Expired)) {
listener.onRegistrationExpired();
}
});
}
@Override
public void close() {
// no-op
}
/**
* Returns the CookiePath of the bookie in the ZooKeeper.
*
* @param bookieId bookie id
* @return
*/
public String getCookiePath(String bookieId) {
return this.cookiePath + "/" + bookieId;
}
//
// Registration Management
//
/**
* Check existence of <i>regPath</i> and wait it expired if possible.
*
* @param regPath reg node path.
* @return true if regPath exists, otherwise return false
* @throws IOException if can't create reg path
*/
protected boolean checkRegNodeAndWaitExpired(String regPath) throws IOException {
final CountDownLatch prevNodeLatch = new CountDownLatch(1);
Watcher zkPrevRegNodewatcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
// Check for prev znode deletion. Connection expiration is
// not handling, since bookie has logic to shutdown.
if (EventType.NodeDeleted == event.getType()) {
prevNodeLatch.countDown();
}
}
};
try {
Stat stat = zk.exists(regPath, zkPrevRegNodewatcher);
if (null != stat) {
// if the ephemeral owner isn't current zookeeper client
// wait for it to be expired.
if (stat.getEphemeralOwner() != zk.getSessionId()) {
log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:"
+ " {} ms for znode deletion", regPath, zkTimeoutMs);
// waiting for the previous bookie reg znode deletion
if (!prevNodeLatch.await(zkTimeoutMs, TimeUnit.MILLISECONDS)) {
throw new NodeExistsException(regPath);
} else {
return false;
}
}
return true;
} else {
return false;
}
} catch (KeeperException ke) {
log.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke);
throw new IOException("ZK exception checking and wait ephemeral znode "
+ regPath + " expired", ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie);
throw new IOException("Interrupted checking and wait ephemeral znode "
+ regPath + " expired", ie);
}
}
@Override
public void registerBookie(String bookieId, boolean readOnly,
BookieServiceInfo bookieServiceInfo) throws BookieException {
if (!readOnly) {
String regPath = bookieRegistrationPath + "/" + bookieId;
doRegisterBookie(regPath, bookieServiceInfo);
} else {
doRegisterReadOnlyBookie(bookieId, bookieServiceInfo);
}
}
@VisibleForTesting
static byte[] serializeBookieServiceInfo(BookieServiceInfo bookieServiceInfo) {
if (log.isDebugEnabled()) {
log.debug("serialize BookieServiceInfo {}", bookieServiceInfo);
}
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
BookieServiceInfoFormat.Builder builder = BookieServiceInfoFormat.newBuilder();
List<BookieServiceInfoFormat.Endpoint> bsiEndpoints = bookieServiceInfo.getEndpoints().stream()
.map(e -> {
return BookieServiceInfoFormat.Endpoint.newBuilder()
.setId(e.getId())
.setPort(e.getPort())
.setHost(e.getHost())
.setProtocol(e.getProtocol())
.addAllAuth(e.getAuth())
.addAllExtensions(e.getExtensions())
.build();
})
.collect(Collectors.toList());
builder.addAllEndpoints(bsiEndpoints);
builder.putAllProperties(bookieServiceInfo.getProperties());
builder.build().writeTo(os);
return os.toByteArray();
} catch (IOException err) {
log.error("Cannot serialize bookieServiceInfo from " + bookieServiceInfo);
throw new RuntimeException(err);
}
}
private void doRegisterBookie(String regPath, BookieServiceInfo bookieServiceInfo) throws BookieException {
// ZK ephemeral node for this Bookie.
try {
if (!checkRegNodeAndWaitExpired(regPath)) {
// Create the ZK ephemeral node for this Bookie.
zk.create(regPath, serializeBookieServiceInfo(bookieServiceInfo), zkAcls, CreateMode.EPHEMERAL);
zkRegManagerInitialized = true;
}
} catch (KeeperException ke) {
log.error("ZK exception registering ephemeral Znode for Bookie!", ke);
// Throw an IOException back up. This will cause the Bookie
// constructor to error out. Alternatively, we could do a System
// exit here as this is a fatal error.
throw new MetadataStoreException(ke);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("Interrupted exception registering ephemeral Znode for Bookie!", ie);
// Throw an IOException back up. This will cause the Bookie
// constructor to error out. Alternatively, we could do a System
// exit here as this is a fatal error.
throw new MetadataStoreException(ie);
} catch (IOException e) {
throw new MetadataStoreException(e);
}
}
private void doRegisterReadOnlyBookie(String bookieId, BookieServiceInfo bookieServiceInfo) throws BookieException {
try {
if (null == zk.exists(this.bookieReadonlyRegistrationPath, false)) {
try {
zk.create(this.bookieReadonlyRegistrationPath, serializeBookieServiceInfo(bookieServiceInfo),
zkAcls, CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// this node is just now created by someone.
}
}
String regPath = bookieReadonlyRegistrationPath + "/" + bookieId;
doRegisterBookie(regPath, bookieServiceInfo);
// clear the write state
regPath = bookieRegistrationPath + "/" + bookieId;
try {
// Clear the current registered node
zk.delete(regPath, -1);
} catch (KeeperException.NoNodeException nne) {
log.warn("No writable bookie registered node {} when transitioning to readonly",
regPath, nne);
}
} catch (KeeperException | InterruptedException e) {
throw new MetadataStoreException(e);
}
}
@Override
public void unregisterBookie(String bookieId, boolean readOnly) throws BookieException {
String regPath;
if (!readOnly) {
regPath = bookieRegistrationPath + "/" + bookieId;
} else {
regPath = bookieReadonlyRegistrationPath + "/" + bookieId;
}
doUnregisterBookie(regPath);
}
private void doUnregisterBookie(String regPath) throws BookieException {
try {
zk.delete(regPath, -1);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new MetadataStoreException(ie);
} catch (KeeperException e) {
throw new MetadataStoreException(e);
}
}
//
// Cookie Management
//
@Override
public void writeCookie(String bookieId,
Versioned<byte[]> cookieData) throws BookieException {
String zkPath = getCookiePath(bookieId);
try {
if (Version.NEW == cookieData.getVersion()) {
if (zk.exists(cookiePath, false) == null) {
try {
zk.create(cookiePath, new byte[0], zkAcls, CreateMode.PERSISTENT);
} catch (NodeExistsException nne) {
log.info("More than one bookie tried to create {} at once. Safe to ignore.",
cookiePath);
}
}
zk.create(zkPath, cookieData.getValue(), zkAcls, CreateMode.PERSISTENT);
} else {
if (!(cookieData.getVersion() instanceof LongVersion)) {
throw new BookieIllegalOpException("Invalid version type, expected it to be LongVersion");
}
zk.setData(
zkPath,
cookieData.getValue(),
(int) ((LongVersion) cookieData.getVersion()).getLongVersion());
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new MetadataStoreException("Interrupted writing cookie for bookie " + bookieId, ie);
} catch (NoNodeException nne) {
throw new CookieNotFoundException(bookieId);
} catch (NodeExistsException nee) {
throw new CookieExistException(bookieId);
} catch (KeeperException e) {
throw new MetadataStoreException("Failed to write cookie for bookie " + bookieId);
}
}
@Override
public Versioned<byte[]> readCookie(String bookieId) throws BookieException {
String zkPath = getCookiePath(bookieId);
try {
Stat stat = zk.exists(zkPath, false);
byte[] data = zk.getData(zkPath, false, stat);
// sets stat version from ZooKeeper
LongVersion version = new LongVersion(stat.getVersion());
return new Versioned<>(data, version);
} catch (NoNodeException nne) {
throw new CookieNotFoundException(bookieId);
} catch (KeeperException | InterruptedException e) {
throw new MetadataStoreException("Failed to read cookie for bookie " + bookieId);
}
}
@Override
public void removeCookie(String bookieId, Version version) throws BookieException {
String zkPath = getCookiePath(bookieId);
try {
zk.delete(zkPath, (int) ((LongVersion) version).getLongVersion());
} catch (NoNodeException e) {
throw new CookieNotFoundException(bookieId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MetadataStoreException("Interrupted deleting cookie for bookie " + bookieId, e);
} catch (KeeperException e) {
throw new MetadataStoreException("Failed to delete cookie for bookie " + bookieId);
}
log.info("Removed cookie from {} for bookie {}.", cookiePath, bookieId);
}
@Override
public String getClusterInstanceId() throws BookieException {
String instanceId = null;
try {
if (zk.exists(ledgersRootPath, null) == null) {
log.error("BookKeeper metadata doesn't exist in zookeeper. "
+ "Has the cluster been initialized? "
+ "Try running bin/bookkeeper shell metaformat");
throw new KeeperException.NoNodeException("BookKeeper metadata");
}
try {
byte[] data = zk.getData(ledgersRootPath + "/"
+ INSTANCEID, false, null);
instanceId = new String(data, UTF_8);
} catch (KeeperException.NoNodeException e) {
log.info("INSTANCEID not exists in zookeeper. Not considering it for data verification");
}
} catch (KeeperException | InterruptedException e) {
throw new MetadataStoreException("Failed to get cluster instance id", e);
}
return instanceId;
}
@Override
public boolean prepareFormat() throws Exception {
boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false);
boolean availableNodeExists = null != zk.exists(bookieRegistrationPath, false);
// Create ledgers root node if not exists
if (!ledgerRootExists) {
ZkUtils.createFullPathOptimistic(zk, ledgersRootPath, "".getBytes(StandardCharsets.UTF_8), zkAcls,
CreateMode.PERSISTENT);
}
// create available bookies node if not exists
if (!availableNodeExists) {
zk.create(bookieRegistrationPath, "".getBytes(StandardCharsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
}
// create readonly bookies node if not exists
if (null == zk.exists(bookieReadonlyRegistrationPath, false)) {
zk.create(bookieReadonlyRegistrationPath, new byte[0], zkAcls, CreateMode.PERSISTENT);
}
return ledgerRootExists;
}
@Override
public boolean initNewCluster() throws Exception {
String zkServers = ZKMetadataDriverBase.resolveZkServers(conf);
String instanceIdPath = ledgersRootPath + "/" + INSTANCEID;
log.info("Initializing ZooKeeper metadata for new cluster, ZKServers: {} ledger root path: {}", zkServers,
ledgersRootPath);
boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false);
if (ledgerRootExists) {
log.error("Ledger root path: {} already exists", ledgersRootPath);
return false;
}
List<Op> multiOps = Lists.newArrayListWithExpectedSize(4);
// Create ledgers root node
multiOps.add(Op.create(ledgersRootPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT));
// create available bookies node
multiOps.add(Op.create(bookieRegistrationPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT));
// create readonly bookies node
multiOps.add(Op.create(
bookieReadonlyRegistrationPath,
EMPTY_BYTE_ARRAY,
zkAcls,
CreateMode.PERSISTENT));
// create INSTANCEID
String instanceId = UUID.randomUUID().toString();
multiOps.add(Op.create(instanceIdPath, instanceId.getBytes(UTF_8),
zkAcls, CreateMode.PERSISTENT));
// execute the multi ops
zk.multi(multiOps);
// creates the new layout and stores in zookeeper
AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, layoutManager);
log.info("Successfully initiated cluster. ZKServers: {} ledger root path: {} instanceId: {}", zkServers,
ledgersRootPath, instanceId);
return true;
}
@Override
public boolean nukeExistingCluster() throws Exception {
String zkServers = ZKMetadataDriverBase.resolveZkServers(conf);
log.info("Nuking ZooKeeper metadata of existing cluster, ZKServers: {} ledger root path: {}",
zkServers, ledgersRootPath);
boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false);
if (!ledgerRootExists) {
log.info("There is no existing cluster with ledgersRootPath: {} in ZKServers: {}, "
+ "so exiting nuke operation", ledgersRootPath, zkServers);
return true;
}
boolean availableNodeExists = null != zk.exists(bookieRegistrationPath, false);
try (RegistrationClient regClient = new ZKRegistrationClient(
zk,
ledgersRootPath,
null
)) {
if (availableNodeExists) {
Collection<BookieSocketAddress> rwBookies = FutureUtils
.result(regClient.getWritableBookies(), EXCEPTION_FUNC).getValue();
if (rwBookies != null && !rwBookies.isEmpty()) {
log.error("Bookies are still up and connected to this cluster, "
+ "stop all bookies before nuking the cluster");
return false;
}
boolean readonlyNodeExists = null != zk.exists(bookieReadonlyRegistrationPath, false);
if (readonlyNodeExists) {
Collection<BookieSocketAddress> roBookies = FutureUtils
.result(regClient.getReadOnlyBookies(), EXCEPTION_FUNC).getValue();
if (roBookies != null && !roBookies.isEmpty()) {
log.error("Readonly Bookies are still up and connected to this cluster, "
+ "stop all bookies before nuking the cluster");
return false;
}
}
}
}
LedgerManagerFactory ledgerManagerFactory =
AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, layoutManager);
return ledgerManagerFactory.validateAndNukeExistingCluster(conf, layoutManager);
}
@Override
public boolean format() throws Exception {
// Clear underreplicated ledgers
try {
ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(ledgersRootPath)
+ BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH);
} catch (KeeperException.NoNodeException e) {
if (log.isDebugEnabled()) {
log.debug("underreplicated ledgers root path node not exists in zookeeper to delete");
}
}
// Clear underreplicatedledger locks
try {
ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(ledgersRootPath) + '/'
+ BookKeeperConstants.UNDER_REPLICATION_LOCK);
} catch (KeeperException.NoNodeException e) {
if (log.isDebugEnabled()) {
log.debug("underreplicatedledger locks node not exists in zookeeper to delete");
}
}
// Clear the cookies
try {
ZKUtil.deleteRecursive(zk, cookiePath);
} catch (KeeperException.NoNodeException e) {
if (log.isDebugEnabled()) {
log.debug("cookies node not exists in zookeeper to delete");
}
}
// Clear the INSTANCEID
try {
zk.delete(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID, -1);
} catch (KeeperException.NoNodeException e) {
if (log.isDebugEnabled()) {
log.debug("INSTANCEID not exists in zookeeper to delete");
}
}
// create INSTANCEID
String instanceId = UUID.randomUUID().toString();
zk.create(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID,
instanceId.getBytes(StandardCharsets.UTF_8), zkAcls, CreateMode.PERSISTENT);
log.info("Successfully formatted BookKeeper metadata");
return true;
}
@Override
public boolean isBookieRegistered(String bookieId) throws BookieException {
String regPath = bookieRegistrationPath + "/" + bookieId;
String readonlyRegPath = bookieReadonlyRegistrationPath + "/" + bookieId;
try {
return ((null != zk.exists(regPath, false)) || (null != zk.exists(readonlyRegPath, false)));
} catch (KeeperException e) {
log.error("ZK exception while checking registration ephemeral znodes for BookieId: {}", bookieId, e);
throw new MetadataStoreException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("InterruptedException while checking registration ephemeral znodes for BookieId: {}", bookieId,
e);
throw new MetadataStoreException(e);
}
}
}