blob: a01bb7000a6cc4a7e185539d127b8f7c929aa940 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog.namespace;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.twitter.distributedlog.BKDistributedLogNamespace;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogConstants;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* Builder to construct a <code>DistributedLogNamespace</code>.
* The builder takes the responsibility of loading backend according to the uri.
*
* @see DistributedLogNamespace
* @since 0.3.32
*/
public class DistributedLogNamespaceBuilder {
private static final Logger logger = LoggerFactory.getLogger(DistributedLogNamespaceBuilder.class);
public static DistributedLogNamespaceBuilder newBuilder() {
return new DistributedLogNamespaceBuilder();
}
private DistributedLogConfiguration _conf = null;
private URI _uri = null;
private StatsLogger _statsLogger = NullStatsLogger.INSTANCE;
private StatsLogger _perLogStatsLogger = NullStatsLogger.INSTANCE;
private FeatureProvider _featureProvider = null;
private String _clientId = DistributedLogConstants.UNKNOWN_CLIENT_ID;
private int _regionId = DistributedLogConstants.LOCAL_REGION_ID;
// private constructor
private DistributedLogNamespaceBuilder() {}
/**
* DistributedLog Configuration used for the namespace.
*
* @param conf
* distributedlog configuration
* @return namespace builder.
*/
public DistributedLogNamespaceBuilder conf(DistributedLogConfiguration conf) {
this._conf = conf;
return this;
}
/**
* Namespace Location.
*
* @param uri
* namespace location uri.
* @see DistributedLogNamespace
* @return namespace builder.
*/
public DistributedLogNamespaceBuilder uri(URI uri) {
this._uri = uri;
return this;
}
/**
* Stats Logger used for stats collection
*
* @param statsLogger
* stats logger
* @return namespace builder.
*/
public DistributedLogNamespaceBuilder statsLogger(StatsLogger statsLogger) {
this._statsLogger = statsLogger;
return this;
}
/**
* Stats Logger used for collecting per log stats.
*
* @param statsLogger
* stats logger for collecting per log stats
* @return namespace builder.
*/
public DistributedLogNamespaceBuilder perLogStatsLogger(StatsLogger statsLogger) {
this._perLogStatsLogger = statsLogger;
return this;
}
/**
* Feature provider used to control the availabilities of features in the namespace.
*
* @param featureProvider
* feature provider to control availabilities of features.
* @return namespace builder.
*/
public DistributedLogNamespaceBuilder featureProvider(FeatureProvider featureProvider) {
this._featureProvider = featureProvider;
return this;
}
/**
* Client Id used for accessing the namespace
*
* @param clientId
* client id used for accessing the namespace
* @return namespace builder.
*/
public DistributedLogNamespaceBuilder clientId(String clientId) {
this._clientId = clientId;
return this;
}
/**
* Region Id used for encoding logs in the namespace. The region id
* is useful when the namespace is globally spanning over regions.
*
* @param regionId
* region id.
* @return namespace builder.
*/
public DistributedLogNamespaceBuilder regionId(int regionId) {
this._regionId = regionId;
return this;
}
/**
* Build the namespace.
*
* @return the namespace instance.
* @throws IllegalArgumentException when there is illegal argument provided in the builder
* @throws NullPointerException when there is null argument provided in the builder
* @throws IOException when fail to build the backend
*/
public DistributedLogNamespace build()
throws IllegalArgumentException, NullPointerException, IOException {
// Check arguments
Preconditions.checkNotNull(_conf, "No DistributedLog Configuration.");
Preconditions.checkNotNull(_uri, "No DistributedLog URI");
// Validate the uri and load the backend according to scheme
String scheme = _uri.getScheme();
Preconditions.checkNotNull(scheme, "Invalid DistributedLog URI : " + _uri);
String[] schemeParts = StringUtils.split(scheme, '-');
Preconditions.checkArgument(schemeParts.length > 0,
"Invalid distributedlog scheme found : " + _uri);
Preconditions.checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()),
"Unknown distributedlog scheme found : " + _uri);
// both distributedlog: & distributedlog-bk: use bookkeeper as the backend
// TODO: we could do reflection to load backend in future.
// if we are going to support other backends : e.g. 'distributedlog-mem:'.
if (schemeParts.length > 1) {
String backendProvider = schemeParts[1];
Preconditions.checkArgument(Objects.equal(DistributedLogConstants.BACKEND_BK, backendProvider.toLowerCase()),
"Backend '" + backendProvider + "' is not supported yet.");
}
// Built the feature provider
FeatureProvider featureProvider;
if (null == _featureProvider) {
featureProvider = new SettableFeatureProvider("", 0);
logger.info("No feature provider is set. All features are disabled now.");
} else {
featureProvider = _featureProvider;
}
URI bkUri;
try {
bkUri = new URI(
schemeParts[0], // remove backend info from bookkeeper backend
_uri.getAuthority(),
_uri.getPath(),
_uri.getQuery(),
_uri.getFragment());
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid distributedlog uri found : " + _uri, e);
}
return BKDistributedLogNamespace.newBuilder()
.conf(_conf)
.uri(bkUri)
.statsLogger(_statsLogger)
.featureProvider(featureProvider)
.clientId(_clientId)
.regionId(_regionId)
.build();
}
}