blob: a96f8438f168a7ee9377e844d0d2ae43bb6b2b0f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.configcenter.support.zookeeper;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.configcenter.ConfigurationListener;
import org.apache.dubbo.common.config.configcenter.TreePathDynamicConfiguration;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.remoting.zookeeper.ZookeeperClient;
import org.apache.dubbo.remoting.zookeeper.ZookeeperTransporter;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class ZookeeperDynamicConfiguration extends TreePathDynamicConfiguration {
private Executor executor;
// The final root path would be: /configRootPath/"config"
private String rootPath;
private final ZookeeperClient zkClient;
private CountDownLatch initializedLatch;
private CacheListener cacheListener;
private URL url;
ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
this.url = url;
rootPath = getRootPath(url);
initializedLatch = new CountDownLatch(1);
this.cacheListener = new CacheListener(rootPath, initializedLatch);
this.executor = Executors.newFixedThreadPool(1, new NamedThreadFactory(this.getClass().getSimpleName(), true));
zkClient = zookeeperTransporter.connect(url);
zkClient.addDataListener(rootPath, cacheListener, executor);
try {
// Wait for connection
long timeout = url.getParameter("init.timeout", 5000);
boolean isCountDown = this.initializedLatch.await(timeout, TimeUnit.MILLISECONDS);
if (!isCountDown) {
throw new IllegalStateException("Failed to receive INITIALIZED event from zookeeper, pls. check if url "
+ url + " is correct");
}
} catch (InterruptedException e) {
logger.warn("Failed to build local cache for config center (zookeeper)." + url);
}
}
/**
* @param key e.g., {service}.configurators, {service}.tagrouters, {group}.dubbo.properties
* @return
*/
@Override
public String getInternalProperty(String key) {
return zkClient.getContent(key);
}
@Override
protected void doClose() throws Exception {
zkClient.close();
}
@Override
protected boolean doPublishConfig(String pathKey, String content) throws Exception {
zkClient.create(pathKey, content, false);
return true;
}
@Override
protected String doGetConfig(String pathKey) throws Exception {
return zkClient.getContent(pathKey);
}
@Override
protected boolean doRemoveConfig(String pathKey) throws Exception {
zkClient.delete(pathKey);
return true;
}
@Override
protected Collection<String> doGetConfigKeys(String groupPath) {
return zkClient.getChildren(groupPath);
}
@Override
protected void doAddListener(String pathKey, ConfigurationListener listener) {
cacheListener.addListener(pathKey, listener);
}
@Override
protected void doRemoveListener(String pathKey, ConfigurationListener listener) {
cacheListener.removeListener(pathKey, listener);
}
}