blob: bec6f582b7e19c60667ecc9f7ed288ca201d88b2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.feature;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.feature.SettableFeature;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
import org.apache.distributedlog.common.config.ConfigurationListener;
import org.apache.distributedlog.common.config.ConfigurationSubscription;
import org.apache.distributedlog.common.config.FileConfigurationBuilder;
import org.apache.distributedlog.common.config.PropertiesConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Feature Provider based dynamic configuration.
*/
public class DynamicConfigurationFeatureProvider extends AbstractFeatureProvider
implements ConfigurationListener {
private static final Logger logger = LoggerFactory.getLogger(DynamicConfigurationFeatureProvider.class);
private final ConcurrentBaseConfiguration featuresConf;
private ConfigurationSubscription featuresConfSubscription;
private final ConcurrentMap<String, SettableFeature> features;
private final ScheduledExecutorService executorService;
public DynamicConfigurationFeatureProvider(String rootScope,
DistributedLogConfiguration conf,
StatsLogger statsLogger) {
super(rootScope, conf, statsLogger);
this.features = new ConcurrentHashMap<String, SettableFeature>();
this.featuresConf = new ConcurrentBaseConfiguration();
this.executorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("DynamicConfigurationFeatureProvider-%d").build());
}
ConcurrentBaseConfiguration getFeatureConf() {
return featuresConf;
}
ConfigurationSubscription getFeatureConfSubscription() {
return featuresConfSubscription;
}
@Override
public void start() throws IOException {
List<FileConfigurationBuilder> fileConfigBuilders =
Lists.newArrayListWithExpectedSize(2);
String baseConfigPath = conf.getFileFeatureProviderBaseConfigPath();
checkNotNull(baseConfigPath);
File baseConfigFile = new File(baseConfigPath);
FileConfigurationBuilder baseProperties =
new PropertiesConfigurationBuilder(baseConfigFile.toURI().toURL());
fileConfigBuilders.add(baseProperties);
String overlayConfigPath = conf.getFileFeatureProviderOverlayConfigPath();
if (null != overlayConfigPath) {
File overlayConfigFile = new File(overlayConfigPath);
FileConfigurationBuilder overlayProperties =
new PropertiesConfigurationBuilder(overlayConfigFile.toURI().toURL());
fileConfigBuilders.add(overlayProperties);
}
try {
this.featuresConfSubscription = new ConfigurationSubscription(
this.featuresConf,
fileConfigBuilders,
executorService,
conf.getDynamicConfigReloadIntervalSec(),
TimeUnit.SECONDS);
} catch (ConfigurationException e) {
throw new IOException("Failed to register subscription on features configuration");
}
this.featuresConfSubscription.registerListener(this);
}
@Override
public void stop() {
this.executorService.shutdown();
}
@Override
public void onReload(ConcurrentBaseConfiguration conf) {
for (Map.Entry<String, SettableFeature> feature : features.entrySet()) {
String featureName = feature.getKey();
int availability = conf.getInt(featureName, 0);
if (availability != feature.getValue().availability()) {
feature.getValue().set(availability);
logger.info("Reload feature {}={}", featureName, availability);
}
}
}
@Override
protected Feature makeFeature(String featureName) {
return ConfigurationFeatureProvider.makeFeature(
featuresConf, features, featureName);
}
@Override
protected FeatureProvider makeProvider(String fullScopeName) {
return new ConfigurationFeatureProvider(
fullScopeName, featuresConf, features);
}
}