blob: b9e3a31c55f81ed49da92d83a7231b4a1b6f7bb0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.namespace;
import com.google.common.base.Objects;
import com.google.common.collect.Sets;
import org.apache.distributedlog.DistributedLogConstants;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static com.google.common.base.Preconditions.*;
/**
* The basic service for managing a set of namespace drivers.
*/
public class NamespaceDriverManager {
private static final Logger logger = LoggerFactory.getLogger(NamespaceDriverManager.class);
static class NamespaceDriverInfo {
final Class<? extends NamespaceDriver> driverClass;
final String driverClassName;
NamespaceDriverInfo(Class<? extends NamespaceDriver> driverClass) {
this.driverClass = driverClass;
this.driverClassName = this.driverClass.getName();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("driver[")
.append(driverClassName)
.append("]");
return sb.toString();
}
}
private static final ConcurrentMap<String, NamespaceDriverInfo> drivers;
private static boolean initialized = false;
static {
drivers = new ConcurrentHashMap<String, NamespaceDriverInfo>();
initialize();
}
static void initialize() {
if (initialized) {
return;
}
loadInitialDrivers();
initialized = true;
logger.info("DistributedLog NamespaceDriverManager initialized");
}
private static void loadInitialDrivers() {
Set<String> driverList = Sets.newHashSet();
// add default bookkeeper based driver
driverList.add(BKNamespaceDriver.class.getName());
// load drivers from system property
String driversStr = System.getProperty("distributedlog.namespace.drivers");
if (null != driversStr) {
String[] driversArray = StringUtils.split(driversStr, ':');
for (String driver : driversArray) {
driverList.add(driver);
}
}
// initialize the drivers
for (String driverClsName : driverList) {
try {
NamespaceDriver driver =
ReflectionUtils.newInstance(driverClsName, NamespaceDriver.class);
NamespaceDriverInfo driverInfo = new NamespaceDriverInfo(driver.getClass());
drivers.put(driver.getScheme().toLowerCase(), driverInfo);
} catch (Exception ex) {
logger.warn("Failed to load namespace driver {} : ", driverClsName, ex);
}
}
}
/**
* Prevent the NamespaceDriverManager class from being instantiated.
*/
private NamespaceDriverManager() {}
/**
* Register the namespace {@code driver}.
*
* @param driver the namespace driver
* @return the namespace driver manager
*/
public static void registerDriver(String backend, Class<? extends NamespaceDriver> driver) {
if (!initialized) {
initialize();
}
String scheme = backend.toLowerCase();
NamespaceDriverInfo oldDriverInfo = drivers.get(scheme);
if (null != oldDriverInfo) {
return;
}
NamespaceDriverInfo newDriverInfo = new NamespaceDriverInfo(driver);
oldDriverInfo = drivers.putIfAbsent(scheme, newDriverInfo);
if (null != oldDriverInfo) {
logger.debug("Driver for {} is already there.", scheme);
}
}
/**
* Retrieve the namespace driver for {@code scheme}.
*
* @param scheme the scheme for the namespace driver
* @return the namespace driver
* @throws NullPointerException when scheme is null
*/
public static NamespaceDriver getDriver(String scheme) {
checkNotNull(scheme, "Driver Scheme is null");
if (!initialized) {
initialize();
}
NamespaceDriverInfo driverInfo = drivers.get(scheme.toLowerCase());
if (null == driverInfo) {
throw new IllegalArgumentException("Unknown backend " + scheme);
}
return ReflectionUtils.newInstance(driverInfo.driverClass);
}
/**
* Retrieve the namespace driver for {@code uri}.
*
* @param uri the distributedlog uri
* @return the namespace driver for {@code uri}
* @throws NullPointerException if the distributedlog {@code uri} is null or doesn't have scheme
* or there is no namespace driver registered for the scheme
* @throws IllegalArgumentException if the distributedlog {@code uri} scheme is illegal
*/
public static NamespaceDriver getDriver(URI uri) {
// Validate the uri and load the backend according to scheme
checkNotNull(uri, "DistributedLog uri is null");
String scheme = uri.getScheme();
checkNotNull(scheme, "Invalid distributedlog uri : " + uri);
scheme = scheme.toLowerCase();
String[] schemeParts = StringUtils.split(scheme, '-');
checkArgument(schemeParts.length > 0,
"Invalid distributedlog scheme found : " + uri);
checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()),
"Unknown distributedlog scheme found : " + uri);
// bookkeeper is the default backend
String backend = DistributedLogConstants.BACKEND_BK;
if (schemeParts.length > 1) {
backend = schemeParts[1];
}
return getDriver(backend);
}
}