blob: eef608dd5ecf5bfe8a6a5105216c4119d4a4bb32 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.meta;
import static org.apache.bookkeeper.meta.AbstractZkLedgerManager.isLeadgerIdGeneratorZnode;
import static org.apache.bookkeeper.meta.AbstractZkLedgerManager.isSpecialZnode;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.ReflectionUtils;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.meta.LayoutManager.LedgerLayoutExistsException;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
/**
* Abstract ledger manager factory based on zookeeper, which provides common
* methods such as format and validateAndNukeExistingCluster.
*/
@Slf4j
public abstract class AbstractZkLedgerManagerFactory implements LedgerManagerFactory {
protected ZooKeeper zk;
@SuppressWarnings("deprecation")
@Override
public void format(AbstractConfiguration<?> conf, LayoutManager layoutManager)
throws InterruptedException, KeeperException, IOException {
try (AbstractZkLedgerManager ledgerManager = (AbstractZkLedgerManager) newLedgerManager()) {
String ledgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
List<String> children = zk.getChildren(ledgersRootPath, false);
for (String child : children) {
boolean lParentNode = !isSpecialZnode(child) && ledgerManager.isLedgerParentNode(child);
boolean lIdGenerator = isLeadgerIdGeneratorZnode(child);
if (lParentNode || lIdGenerator) {
ZKUtil.deleteRecursive(zk, ledgersRootPath + "/" + child);
}
}
}
Class<? extends LedgerManagerFactory> factoryClass;
try {
factoryClass = conf.getLedgerManagerFactoryClass();
} catch (ConfigurationException e) {
throw new IOException("Failed to get ledger manager factory class from configuration : ", e);
}
layoutManager.deleteLedgerLayout();
// Create new layout information again.
createNewLMFactory(conf, layoutManager, factoryClass);
}
@Override
public boolean validateAndNukeExistingCluster(AbstractConfiguration<?> conf, LayoutManager layoutManager)
throws InterruptedException, KeeperException, IOException {
String zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(conf);
String zkServers = ZKMetadataDriverBase.resolveZkServers(conf);
AbstractZkLedgerManager zkLedgerManager = (AbstractZkLedgerManager) newLedgerManager();
/*
* before proceeding with nuking existing cluster, make sure there
* are no unexpected znodes under ledgersRootPath
*/
List<String> ledgersRootPathChildrenList = zk.getChildren(zkLedgersRootPath, false);
for (String ledgersRootPathChildren : ledgersRootPathChildrenList) {
if ((!AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChildren))
&& (!zkLedgerManager.isLedgerParentNode(ledgersRootPathChildren))) {
log.error("Found unexpected znode : {} under ledgersRootPath : {} so exiting nuke operation",
ledgersRootPathChildren, zkLedgersRootPath);
return false;
}
}
// formatting ledgermanager deletes ledger znodes
format(conf, layoutManager);
// now delete all the special nodes recursively
ledgersRootPathChildrenList = zk.getChildren(zkLedgersRootPath, false);
for (String ledgersRootPathChildren : ledgersRootPathChildrenList) {
if (AbstractZkLedgerManager.isSpecialZnode(ledgersRootPathChildren)) {
ZKUtil.deleteRecursive(zk, zkLedgersRootPath + "/" + ledgersRootPathChildren);
} else {
log.error("Found unexpected znode : {} under ledgersRootPath : {} so exiting nuke operation",
ledgersRootPathChildren, zkLedgersRootPath);
return false;
}
}
// finally deleting the ledgers rootpath
zk.delete(zkLedgersRootPath, -1);
log.info("Successfully nuked existing cluster, ZKServers: {} ledger root path: {}",
zkServers, zkLedgersRootPath);
return true;
}
// v1 layout
static final int V1 = 1;
/**
* Create new Ledger Manager Factory.
*
* @param conf
* Configuration Object.
* @param layoutManager
* layout manager
* @return new ledger manager factory
* @throws IOException
*/
@SuppressWarnings("deprecation")
public static LedgerManagerFactory newLedgerManagerFactory(
final AbstractConfiguration<?> conf, LayoutManager layoutManager)
throws IOException, InterruptedException {
String metadataServiceUriStr;
try {
metadataServiceUriStr = conf.getMetadataServiceUri();
} catch (ConfigurationException e) {
log.error("Failed to retrieve metadata service uri from configuration", e);
throw new IOException(
"Failed to retrieve metadata service uri from configuration", e);
}
Class<? extends LedgerManagerFactory> factoryClass;
String ledgerRootPath;
// `metadataServiceUri` can be null when constructing bookkeeper client using an external zookeeper client.
if (null == metadataServiceUriStr) { //
try {
factoryClass = conf.getLedgerManagerFactoryClass();
} catch (ConfigurationException e) {
log.error("Failed to get ledger manager factory class when using an external zookeeper client", e);
throw new IOException(
"Failed to get ledger manager factory class when using an external zookeeper client", e);
}
ledgerRootPath = conf.getZkLedgersRootPath();
} else {
URI metadataServiceUri = URI.create(metadataServiceUriStr);
factoryClass = ZKMetadataDriverBase.resolveLedgerManagerFactory(metadataServiceUri);
ledgerRootPath = metadataServiceUri.getPath();
}
if (null == ledgerRootPath || ledgerRootPath.length() == 0) {
throw new IOException("Empty Ledger Root Path.");
}
// if layoutManager is null, return the default ledger manager
if (layoutManager == null) {
return new FlatLedgerManagerFactory()
.initialize(conf, null, FlatLedgerManagerFactory.CUR_VERSION);
}
LedgerManagerFactory lmFactory;
// check that the configured ledger manager is
// compatible with the existing layout
LedgerLayout layout = layoutManager.readLedgerLayout();
if (layout == null) { // no existing layout
lmFactory = createNewLMFactory(conf, layoutManager, factoryClass);
return lmFactory
.initialize(conf, layoutManager, lmFactory.getCurrentVersion());
}
if (log.isDebugEnabled()) {
log.debug("read ledger layout {}", layout);
}
// there is existing layout, we need to look into the layout.
// handle pre V2 layout
if (layout.getLayoutFormatVersion() <= V1) {
// pre V2 layout we use type of ledger manager
String lmType = conf.getLedgerManagerType();
if (lmType != null && !layout.getManagerFactoryClass().equals(lmType)) {
throw new IOException("Configured layout " + lmType
+ " does not match existing layout " + layout.getManagerFactoryClass());
}
// create the ledger manager
if (FlatLedgerManagerFactory.NAME.equals(layout.getManagerFactoryClass())) {
lmFactory = new FlatLedgerManagerFactory();
} else if (HierarchicalLedgerManagerFactory.NAME.equals(layout.getManagerFactoryClass())) {
lmFactory = new HierarchicalLedgerManagerFactory();
} else {
throw new IOException("Unknown ledger manager type: " + lmType);
}
return lmFactory.initialize(conf, layoutManager, layout.getManagerVersion());
}
// handle V2 layout case
if (factoryClass != null
&& !isSameLedgerManagerFactory(
conf, layout.getManagerFactoryClass(), factoryClass.getName())
&& conf.getProperty(AbstractConfiguration.LEDGER_MANAGER_FACTORY_DISABLE_CLASS_CHECK) == null) {
// Disable should ONLY happen during compatibility testing.
throw new IOException("Configured layout " + factoryClass.getName()
+ " does not match existing layout " + layout.getManagerFactoryClass());
}
if (factoryClass == null) {
// no factory specified in configuration
try {
Class<?> theCls = Class.forName(layout.getManagerFactoryClass());
if (!LedgerManagerFactory.class.isAssignableFrom(theCls)) {
throw new IOException("Wrong ledger manager factory " + layout.getManagerFactoryClass());
}
factoryClass = theCls.asSubclass(LedgerManagerFactory.class);
} catch (ClassNotFoundException | IOException e) {
factoryClass = attemptToResolveShadedLedgerManagerFactory(
conf,
layout.getManagerFactoryClass(),
e);
}
}
// instantiate a factory
lmFactory = ReflectionUtils.newInstance(factoryClass);
return lmFactory.initialize(conf, layoutManager, layout.getManagerVersion());
}
private static String normalizedLedgerManagerFactoryClassName(String factoryClass,
String shadedClassPrefix,
boolean isShadedClassAllowed) {
if (isShadedClassAllowed) {
if (null == factoryClass || null == shadedClassPrefix) {
return factoryClass;
} else {
return factoryClass.replace(shadedClassPrefix, "");
}
} else {
return factoryClass;
}
}
private static boolean isSameLedgerManagerFactory(
AbstractConfiguration<?> conf, String leftFactoryClass, String rightFactoryClass) {
leftFactoryClass = normalizedLedgerManagerFactoryClassName(
leftFactoryClass,
conf.getShadedLedgerManagerFactoryClassPrefix(),
conf.isShadedLedgerManagerFactoryClassAllowed());
rightFactoryClass = normalizedLedgerManagerFactoryClassName(
rightFactoryClass,
conf.getShadedLedgerManagerFactoryClassPrefix(),
conf.isShadedLedgerManagerFactoryClassAllowed());
return Objects.equals(leftFactoryClass, rightFactoryClass);
}
private static Class<? extends LedgerManagerFactory> attemptToResolveShadedLedgerManagerFactory(
AbstractConfiguration<?> conf, String lmfClassName, Throwable cause) throws IOException {
if (conf.isShadedLedgerManagerFactoryClassAllowed()) {
String shadedPrefix = conf.getShadedLedgerManagerFactoryClassPrefix();
log.warn("Failed to instantiate ledger manager factory {}, trying its shaded class {}{}",
lmfClassName, shadedPrefix, lmfClassName);
// try shading class
try {
return resolveShadedLedgerManagerFactory(lmfClassName, shadedPrefix);
} catch (ClassNotFoundException cnfe) {
throw new IOException("Failed to instantiate ledger manager factory "
+ lmfClassName + " and its shaded class " + shadedPrefix + lmfClassName, cnfe);
}
} else {
throw new IOException("Failed to instantiate ledger manager factory "
+ lmfClassName, cause);
}
}
private static Class<? extends LedgerManagerFactory> resolveShadedLedgerManagerFactory(String lmfClassName,
String shadedClassPrefix)
throws ClassNotFoundException, IOException {
if (null == lmfClassName) {
return null;
} else {
// this is to address the issue when using the dlog shaded jar
Class<?> theCls = Class.forName(shadedClassPrefix + lmfClassName);
if (!LedgerManagerFactory.class.isAssignableFrom(theCls)) {
throw new IOException("Wrong shaded ledger manager factory : " + shadedClassPrefix + lmfClassName);
}
return theCls.asSubclass(LedgerManagerFactory.class);
}
}
/**
* Creates the new layout and stores in zookeeper and returns the
* LedgerManagerFactory instance.
*/
@SuppressWarnings("deprecation")
protected static LedgerManagerFactory createNewLMFactory(
final AbstractConfiguration conf, final LayoutManager layoutManager,
Class<? extends LedgerManagerFactory> factoryClass)
throws IOException, InterruptedException {
LedgerManagerFactory lmFactory;
LedgerLayout layout;
// use default ledger manager factory if no one provided
if (factoryClass == null) {
// for backward compatibility, check manager type
String lmType = conf.getLedgerManagerType();
if (lmType == null) {
factoryClass = HierarchicalLedgerManagerFactory.class;
} else {
if (FlatLedgerManagerFactory.NAME.equals(lmType)) {
factoryClass = FlatLedgerManagerFactory.class;
} else if (HierarchicalLedgerManagerFactory.NAME.equals(lmType)) {
factoryClass = HierarchicalLedgerManagerFactory.class;
} else if (LongHierarchicalLedgerManagerFactory.NAME.equals(lmType)) {
factoryClass = LongHierarchicalLedgerManagerFactory.class;
} else {
throw new IOException("Unknown ledger manager type: "
+ lmType);
}
}
}
lmFactory = ReflectionUtils.newInstance(factoryClass);
layout = new LedgerLayout(factoryClass.getName(),
lmFactory.getCurrentVersion());
try {
layoutManager.storeLedgerLayout(layout);
} catch (LedgerLayoutExistsException e) {
LedgerLayout layout2 = layoutManager.readLedgerLayout();
if (!layout2.equals(layout)) {
throw new IOException(
"Contention writing to layout to zookeeper, "
+ " other layout " + layout2
+ " is incompatible with our " + "layout "
+ layout);
}
}
return lmFactory;
}
}