| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.bookkeeper.discover; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; |
| import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE; |
| import static org.apache.bookkeeper.util.BookKeeperConstants.EMPTY_BYTE_ARRAY; |
| import static org.apache.bookkeeper.util.BookKeeperConstants.INSTANCEID; |
| import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.Lists; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.bookkeeper.bookie.BookieException; |
| import org.apache.bookkeeper.bookie.BookieException.BookieIllegalOpException; |
| import org.apache.bookkeeper.bookie.BookieException.CookieExistException; |
| import org.apache.bookkeeper.bookie.BookieException.CookieNotFoundException; |
| import org.apache.bookkeeper.bookie.BookieException.MetadataStoreException; |
| import org.apache.bookkeeper.client.BKException; |
| import org.apache.bookkeeper.client.BKException.BKInterruptedException; |
| import org.apache.bookkeeper.client.BKException.MetaStoreException; |
| import org.apache.bookkeeper.common.concurrent.FutureUtils; |
| import org.apache.bookkeeper.conf.ServerConfiguration; |
| import org.apache.bookkeeper.meta.AbstractZkLedgerManagerFactory; |
| import org.apache.bookkeeper.meta.LayoutManager; |
| import org.apache.bookkeeper.meta.LedgerManagerFactory; |
| import org.apache.bookkeeper.meta.ZkLayoutManager; |
| import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; |
| import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; |
| import org.apache.bookkeeper.net.BookieId; |
| import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat; |
| import org.apache.bookkeeper.util.BookKeeperConstants; |
| import org.apache.bookkeeper.util.ZkUtils; |
| import org.apache.bookkeeper.versioning.LongVersion; |
| import org.apache.bookkeeper.versioning.Version; |
| import org.apache.bookkeeper.versioning.Versioned; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.KeeperException.NoNodeException; |
| import org.apache.zookeeper.KeeperException.NodeExistsException; |
| import org.apache.zookeeper.Op; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.Watcher.Event.EventType; |
| import org.apache.zookeeper.Watcher.Event.KeeperState; |
| import org.apache.zookeeper.ZKUtil; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.apache.zookeeper.data.ACL; |
| import org.apache.zookeeper.data.Stat; |
| |
| /** |
| * ZooKeeper Based {@link RegistrationManager}. |
| */ |
| @Slf4j |
| public class ZKRegistrationManager implements RegistrationManager { |
| |
| private static final Function<Throwable, BKException> EXCEPTION_FUNC = cause -> { |
| if (cause instanceof BKException) { |
| log.error("Failed to get bookie list : ", cause); |
| return (BKException) cause; |
| } else if (cause instanceof InterruptedException) { |
| log.error("Interrupted reading bookie list : ", cause); |
| return new BKInterruptedException(); |
| } else { |
| return new MetaStoreException(); |
| } |
| }; |
| |
| private final ServerConfiguration conf; |
| private final ZooKeeper zk; |
| private final List<ACL> zkAcls; |
| private final LayoutManager layoutManager; |
| |
| private volatile boolean zkRegManagerInitialized = false; |
| |
| // ledgers root path |
| private final String ledgersRootPath; |
| // cookie path |
| private final String cookiePath; |
| // registration paths |
| protected final String bookieRegistrationPath; |
| protected final String bookieReadonlyRegistrationPath; |
| // session timeout in milliseconds |
| private final int zkTimeoutMs; |
| |
| public ZKRegistrationManager(ServerConfiguration conf, |
| ZooKeeper zk, |
| RegistrationListener listener) { |
| this(conf, zk, ZKMetadataDriverBase.resolveZkLedgersRootPath(conf), listener); |
| } |
| |
| public ZKRegistrationManager(ServerConfiguration conf, |
| ZooKeeper zk, |
| String ledgersRootPath, |
| RegistrationListener listener) { |
| this.conf = conf; |
| this.zk = zk; |
| this.zkAcls = ZkUtils.getACLs(conf); |
| this.ledgersRootPath = ledgersRootPath; |
| this.cookiePath = ledgersRootPath + "/" + COOKIE_NODE; |
| this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE; |
| this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY; |
| this.zkTimeoutMs = conf.getZkTimeout(); |
| |
| this.layoutManager = new ZkLayoutManager( |
| zk, |
| ledgersRootPath, |
| zkAcls); |
| |
| this.zk.register(event -> { |
| if (!zkRegManagerInitialized) { |
| // do nothing until first registration |
| return; |
| } |
| // Check for expired connection. |
| if (event.getType().equals(EventType.None) |
| && event.getState().equals(KeeperState.Expired)) { |
| listener.onRegistrationExpired(); |
| } |
| }); |
| } |
| |
| @Override |
| public void close() { |
| // no-op |
| } |
| |
| /** |
| * Returns the CookiePath of the bookie in the ZooKeeper. |
| * |
| * @param bookieId bookie id |
| * @return |
| */ |
| public String getCookiePath(BookieId bookieId) { |
| return this.cookiePath + "/" + bookieId; |
| } |
| |
| // |
| // Registration Management |
| // |
| |
| /** |
| * Check existence of <i>regPath</i> and wait it expired if possible. |
| * |
| * @param regPath reg node path. |
| * @return true if regPath exists, otherwise return false |
| * @throws IOException if can't create reg path |
| */ |
| protected boolean checkRegNodeAndWaitExpired(String regPath) throws IOException { |
| final CountDownLatch prevNodeLatch = new CountDownLatch(1); |
| Watcher zkPrevRegNodewatcher = new Watcher() { |
| @Override |
| public void process(WatchedEvent event) { |
| // Check for prev znode deletion. Connection expiration is |
| // not handling, since bookie has logic to shutdown. |
| if (EventType.NodeDeleted == event.getType()) { |
| prevNodeLatch.countDown(); |
| } |
| } |
| }; |
| try { |
| Stat stat = zk.exists(regPath, zkPrevRegNodewatcher); |
| if (null != stat) { |
| // if the ephemeral owner isn't current zookeeper client |
| // wait for it to be expired. |
| if (stat.getEphemeralOwner() != zk.getSessionId()) { |
| log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout:" |
| + " {} ms for znode deletion", regPath, zkTimeoutMs); |
| // waiting for the previous bookie reg znode deletion |
| if (!prevNodeLatch.await(zkTimeoutMs, TimeUnit.MILLISECONDS)) { |
| throw new NodeExistsException(regPath); |
| } else { |
| return false; |
| } |
| } |
| return true; |
| } else { |
| return false; |
| } |
| } catch (KeeperException ke) { |
| log.error("ZK exception checking and wait ephemeral znode {} expired : ", regPath, ke); |
| throw new IOException("ZK exception checking and wait ephemeral znode " |
| + regPath + " expired", ke); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| log.error("Interrupted checking and wait ephemeral znode {} expired : ", regPath, ie); |
| throw new IOException("Interrupted checking and wait ephemeral znode " |
| + regPath + " expired", ie); |
| } |
| } |
| |
| @Override |
| public void registerBookie(BookieId bookieId, boolean readOnly, |
| BookieServiceInfo bookieServiceInfo) throws BookieException { |
| if (!readOnly) { |
| String regPath = bookieRegistrationPath + "/" + bookieId; |
| doRegisterBookie(regPath, bookieServiceInfo); |
| } else { |
| doRegisterReadOnlyBookie(bookieId, bookieServiceInfo); |
| } |
| } |
| |
| @VisibleForTesting |
| static byte[] serializeBookieServiceInfo(BookieServiceInfo bookieServiceInfo) { |
| if (log.isDebugEnabled()) { |
| log.debug("serialize BookieServiceInfo {}", bookieServiceInfo); |
| } |
| try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { |
| BookieServiceInfoFormat.Builder builder = BookieServiceInfoFormat.newBuilder(); |
| List<BookieServiceInfoFormat.Endpoint> bsiEndpoints = bookieServiceInfo.getEndpoints().stream() |
| .map(e -> { |
| return BookieServiceInfoFormat.Endpoint.newBuilder() |
| .setId(e.getId()) |
| .setPort(e.getPort()) |
| .setHost(e.getHost()) |
| .setProtocol(e.getProtocol()) |
| .addAllAuth(e.getAuth()) |
| .addAllExtensions(e.getExtensions()) |
| .build(); |
| }) |
| .collect(Collectors.toList()); |
| |
| builder.addAllEndpoints(bsiEndpoints); |
| builder.putAllProperties(bookieServiceInfo.getProperties()); |
| |
| builder.build().writeTo(os); |
| return os.toByteArray(); |
| } catch (IOException err) { |
| log.error("Cannot serialize bookieServiceInfo from " + bookieServiceInfo); |
| throw new RuntimeException(err); |
| } |
| } |
| |
| private void doRegisterBookie(String regPath, BookieServiceInfo bookieServiceInfo) throws BookieException { |
| // ZK ephemeral node for this Bookie. |
| try { |
| if (!checkRegNodeAndWaitExpired(regPath)) { |
| // Create the ZK ephemeral node for this Bookie. |
| zk.create(regPath, serializeBookieServiceInfo(bookieServiceInfo), zkAcls, CreateMode.EPHEMERAL); |
| zkRegManagerInitialized = true; |
| } |
| } catch (KeeperException ke) { |
| log.error("ZK exception registering ephemeral Znode for Bookie!", ke); |
| // Throw an IOException back up. This will cause the Bookie |
| // constructor to error out. Alternatively, we could do a System |
| // exit here as this is a fatal error. |
| throw new MetadataStoreException(ke); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| log.error("Interrupted exception registering ephemeral Znode for Bookie!", ie); |
| // Throw an IOException back up. This will cause the Bookie |
| // constructor to error out. Alternatively, we could do a System |
| // exit here as this is a fatal error. |
| throw new MetadataStoreException(ie); |
| } catch (IOException e) { |
| throw new MetadataStoreException(e); |
| } |
| } |
| |
| private void doRegisterReadOnlyBookie(BookieId bookieId, BookieServiceInfo bookieServiceInfo) |
| throws BookieException { |
| try { |
| if (null == zk.exists(this.bookieReadonlyRegistrationPath, false)) { |
| try { |
| zk.create(this.bookieReadonlyRegistrationPath, serializeBookieServiceInfo(bookieServiceInfo), |
| zkAcls, CreateMode.PERSISTENT); |
| } catch (NodeExistsException e) { |
| // this node is just now created by someone. |
| } |
| } |
| |
| String regPath = bookieReadonlyRegistrationPath + "/" + bookieId; |
| doRegisterBookie(regPath, bookieServiceInfo); |
| // clear the write state |
| regPath = bookieRegistrationPath + "/" + bookieId; |
| try { |
| // Clear the current registered node |
| zk.delete(regPath, -1); |
| } catch (KeeperException.NoNodeException nne) { |
| log.warn("No writable bookie registered node {} when transitioning to readonly", |
| regPath, nne); |
| } |
| } catch (KeeperException | InterruptedException e) { |
| throw new MetadataStoreException(e); |
| } |
| } |
| |
| @Override |
| public void unregisterBookie(BookieId bookieId, boolean readOnly) throws BookieException { |
| String regPath; |
| if (!readOnly) { |
| regPath = bookieRegistrationPath + "/" + bookieId; |
| } else { |
| regPath = bookieReadonlyRegistrationPath + "/" + bookieId; |
| } |
| doUnregisterBookie(regPath); |
| } |
| |
| private void doUnregisterBookie(String regPath) throws BookieException { |
| try { |
| zk.delete(regPath, -1); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| throw new MetadataStoreException(ie); |
| } catch (KeeperException e) { |
| throw new MetadataStoreException(e); |
| } |
| } |
| |
| // |
| // Cookie Management |
| // |
| |
| @Override |
| public void writeCookie(BookieId bookieId, |
| Versioned<byte[]> cookieData) throws BookieException { |
| String zkPath = getCookiePath(bookieId); |
| try { |
| if (Version.NEW == cookieData.getVersion()) { |
| if (zk.exists(cookiePath, false) == null) { |
| try { |
| zk.create(cookiePath, new byte[0], zkAcls, CreateMode.PERSISTENT); |
| } catch (NodeExistsException nne) { |
| log.info("More than one bookie tried to create {} at once. Safe to ignore.", |
| cookiePath); |
| } |
| } |
| zk.create(zkPath, cookieData.getValue(), zkAcls, CreateMode.PERSISTENT); |
| } else { |
| if (!(cookieData.getVersion() instanceof LongVersion)) { |
| throw new BookieIllegalOpException("Invalid version type, expected it to be LongVersion"); |
| } |
| zk.setData( |
| zkPath, |
| cookieData.getValue(), |
| (int) ((LongVersion) cookieData.getVersion()).getLongVersion()); |
| } |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| throw new MetadataStoreException("Interrupted writing cookie for bookie " + bookieId, ie); |
| } catch (NoNodeException nne) { |
| throw new CookieNotFoundException(bookieId.toString()); |
| } catch (NodeExistsException nee) { |
| throw new CookieExistException(bookieId.toString()); |
| } catch (KeeperException e) { |
| throw new MetadataStoreException("Failed to write cookie for bookie " + bookieId); |
| } |
| } |
| |
| @Override |
| public Versioned<byte[]> readCookie(BookieId bookieId) throws BookieException { |
| String zkPath = getCookiePath(bookieId); |
| try { |
| Stat stat = zk.exists(zkPath, false); |
| byte[] data = zk.getData(zkPath, false, stat); |
| // sets stat version from ZooKeeper |
| LongVersion version = new LongVersion(stat.getVersion()); |
| return new Versioned<>(data, version); |
| } catch (NoNodeException nne) { |
| throw new CookieNotFoundException(bookieId.toString()); |
| } catch (KeeperException | InterruptedException e) { |
| throw new MetadataStoreException("Failed to read cookie for bookie " + bookieId); |
| } |
| } |
| |
| @Override |
| public void removeCookie(BookieId bookieId, Version version) throws BookieException { |
| String zkPath = getCookiePath(bookieId); |
| try { |
| zk.delete(zkPath, (int) ((LongVersion) version).getLongVersion()); |
| } catch (NoNodeException e) { |
| throw new CookieNotFoundException(bookieId.toString()); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new MetadataStoreException("Interrupted deleting cookie for bookie " + bookieId, e); |
| } catch (KeeperException e) { |
| throw new MetadataStoreException("Failed to delete cookie for bookie " + bookieId); |
| } |
| |
| log.info("Removed cookie from {} for bookie {}.", cookiePath, bookieId); |
| } |
| |
| |
| @Override |
| public String getClusterInstanceId() throws BookieException { |
| String instanceId = null; |
| try { |
| if (zk.exists(ledgersRootPath, null) == null) { |
| log.error("BookKeeper metadata doesn't exist in zookeeper. " |
| + "Has the cluster been initialized? " |
| + "Try running bin/bookkeeper shell metaformat"); |
| throw new KeeperException.NoNodeException("BookKeeper metadata"); |
| } |
| try { |
| byte[] data = zk.getData(ledgersRootPath + "/" |
| + INSTANCEID, false, null); |
| instanceId = new String(data, UTF_8); |
| } catch (KeeperException.NoNodeException e) { |
| log.info("INSTANCEID not exists in zookeeper. Not considering it for data verification"); |
| } |
| } catch (KeeperException | InterruptedException e) { |
| throw new MetadataStoreException("Failed to get cluster instance id", e); |
| } |
| return instanceId; |
| } |
| |
| @Override |
| public boolean prepareFormat() throws Exception { |
| boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false); |
| boolean availableNodeExists = null != zk.exists(bookieRegistrationPath, false); |
| // Create ledgers root node if not exists |
| if (!ledgerRootExists) { |
| ZkUtils.createFullPathOptimistic(zk, ledgersRootPath, "".getBytes(StandardCharsets.UTF_8), zkAcls, |
| CreateMode.PERSISTENT); |
| } |
| // create available bookies node if not exists |
| if (!availableNodeExists) { |
| zk.create(bookieRegistrationPath, "".getBytes(StandardCharsets.UTF_8), zkAcls, CreateMode.PERSISTENT); |
| } |
| |
| // create readonly bookies node if not exists |
| if (null == zk.exists(bookieReadonlyRegistrationPath, false)) { |
| zk.create(bookieReadonlyRegistrationPath, new byte[0], zkAcls, CreateMode.PERSISTENT); |
| } |
| |
| return ledgerRootExists; |
| } |
| |
| @Override |
| public boolean initNewCluster() throws Exception { |
| String zkServers = ZKMetadataDriverBase.resolveZkServers(conf); |
| String instanceIdPath = ledgersRootPath + "/" + INSTANCEID; |
| log.info("Initializing ZooKeeper metadata for new cluster, ZKServers: {} ledger root path: {}", zkServers, |
| ledgersRootPath); |
| |
| boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false); |
| |
| if (ledgerRootExists) { |
| log.error("Ledger root path: {} already exists", ledgersRootPath); |
| return false; |
| } |
| |
| List<Op> multiOps = Lists.newArrayListWithExpectedSize(4); |
| |
| // Create ledgers root node |
| multiOps.add(Op.create(ledgersRootPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT)); |
| |
| // create available bookies node |
| multiOps.add(Op.create(bookieRegistrationPath, EMPTY_BYTE_ARRAY, zkAcls, CreateMode.PERSISTENT)); |
| |
| // create readonly bookies node |
| multiOps.add(Op.create( |
| bookieReadonlyRegistrationPath, |
| EMPTY_BYTE_ARRAY, |
| zkAcls, |
| CreateMode.PERSISTENT)); |
| |
| // create INSTANCEID |
| String instanceId = UUID.randomUUID().toString(); |
| multiOps.add(Op.create(instanceIdPath, instanceId.getBytes(UTF_8), |
| zkAcls, CreateMode.PERSISTENT)); |
| |
| // execute the multi ops |
| zk.multi(multiOps); |
| |
| // creates the new layout and stores in zookeeper |
| AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, layoutManager); |
| |
| log.info("Successfully initiated cluster. ZKServers: {} ledger root path: {} instanceId: {}", zkServers, |
| ledgersRootPath, instanceId); |
| return true; |
| } |
| |
| @Override |
| public boolean nukeExistingCluster() throws Exception { |
| String zkServers = ZKMetadataDriverBase.resolveZkServers(conf); |
| log.info("Nuking ZooKeeper metadata of existing cluster, ZKServers: {} ledger root path: {}", |
| zkServers, ledgersRootPath); |
| |
| boolean ledgerRootExists = null != zk.exists(ledgersRootPath, false); |
| if (!ledgerRootExists) { |
| log.info("There is no existing cluster with ledgersRootPath: {} in ZKServers: {}, " |
| + "so exiting nuke operation", ledgersRootPath, zkServers); |
| return true; |
| } |
| |
| boolean availableNodeExists = null != zk.exists(bookieRegistrationPath, false); |
| try (RegistrationClient regClient = new ZKRegistrationClient( |
| zk, |
| ledgersRootPath, |
| null, |
| false |
| )) { |
| if (availableNodeExists) { |
| Collection<BookieId> rwBookies = FutureUtils |
| .result(regClient.getWritableBookies(), EXCEPTION_FUNC).getValue(); |
| if (rwBookies != null && !rwBookies.isEmpty()) { |
| log.error("Bookies are still up and connected to this cluster, " |
| + "stop all bookies before nuking the cluster"); |
| return false; |
| } |
| |
| boolean readonlyNodeExists = null != zk.exists(bookieReadonlyRegistrationPath, false); |
| if (readonlyNodeExists) { |
| Collection<BookieId> roBookies = FutureUtils |
| .result(regClient.getReadOnlyBookies(), EXCEPTION_FUNC).getValue(); |
| if (roBookies != null && !roBookies.isEmpty()) { |
| log.error("Readonly Bookies are still up and connected to this cluster, " |
| + "stop all bookies before nuking the cluster"); |
| return false; |
| } |
| } |
| } |
| } |
| |
| LedgerManagerFactory ledgerManagerFactory = |
| AbstractZkLedgerManagerFactory.newLedgerManagerFactory(conf, layoutManager); |
| return ledgerManagerFactory.validateAndNukeExistingCluster(conf, layoutManager); |
| } |
| |
| @Override |
| public boolean format() throws Exception { |
| // Clear underreplicated ledgers |
| try { |
| ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(ledgersRootPath) |
| + BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH); |
| } catch (KeeperException.NoNodeException e) { |
| if (log.isDebugEnabled()) { |
| log.debug("underreplicated ledgers root path node not exists in zookeeper to delete"); |
| } |
| } |
| |
| // Clear underreplicatedledger locks |
| try { |
| ZKUtil.deleteRecursive(zk, ZkLedgerUnderreplicationManager.getBasePath(ledgersRootPath) + '/' |
| + BookKeeperConstants.UNDER_REPLICATION_LOCK); |
| } catch (KeeperException.NoNodeException e) { |
| if (log.isDebugEnabled()) { |
| log.debug("underreplicatedledger locks node not exists in zookeeper to delete"); |
| } |
| } |
| |
| // Clear the cookies |
| try { |
| ZKUtil.deleteRecursive(zk, cookiePath); |
| } catch (KeeperException.NoNodeException e) { |
| if (log.isDebugEnabled()) { |
| log.debug("cookies node not exists in zookeeper to delete"); |
| } |
| } |
| |
| // Clear the INSTANCEID |
| try { |
| zk.delete(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID, -1); |
| } catch (KeeperException.NoNodeException e) { |
| if (log.isDebugEnabled()) { |
| log.debug("INSTANCEID not exists in zookeeper to delete"); |
| } |
| } |
| |
| // create INSTANCEID |
| String instanceId = UUID.randomUUID().toString(); |
| zk.create(ledgersRootPath + "/" + BookKeeperConstants.INSTANCEID, |
| instanceId.getBytes(StandardCharsets.UTF_8), zkAcls, CreateMode.PERSISTENT); |
| |
| log.info("Successfully formatted BookKeeper metadata"); |
| return true; |
| } |
| |
| @Override |
| public boolean isBookieRegistered(BookieId bookieId) throws BookieException { |
| String regPath = bookieRegistrationPath + "/" + bookieId; |
| String readonlyRegPath = bookieReadonlyRegistrationPath + "/" + bookieId; |
| try { |
| return ((null != zk.exists(regPath, false)) || (null != zk.exists(readonlyRegPath, false))); |
| } catch (KeeperException e) { |
| log.error("ZK exception while checking registration ephemeral znodes for BookieId: {}", bookieId, e); |
| throw new MetadataStoreException(e); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| log.error("InterruptedException while checking registration ephemeral znodes for BookieId: {}", bookieId, |
| e); |
| throw new MetadataStoreException(e); |
| } |
| } |
| } |