blob: 2a3caaf740a87539c46f465f8d7d0d38f713cb39 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.elasticjob.cloud.reg.zookeeper;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.shardingsphere.elasticjob.cloud.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.cloud.reg.exception.RegExceptionHandler;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
/**
* Registry center of ZooKeeper.
*/
@Slf4j
public final class ZookeeperRegistryCenter implements CoordinatorRegistryCenter {
@Getter(AccessLevel.PROTECTED)
private ZookeeperConfiguration zkConfig;
private final Map<String, TreeCache> caches = new HashMap<>();
@Getter
private CuratorFramework client;
public ZookeeperRegistryCenter(final ZookeeperConfiguration zkConfig) {
this.zkConfig = zkConfig;
}
@Override
public void init() {
log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getServerLists())
.retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
.namespace(zkConfig.getNamespace());
if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
}
if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
}
if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
builder.authorization("digest", zkConfig.getDigest().getBytes(Charsets.UTF_8))
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
client = builder.build();
client.start();
try {
if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
client.close();
throw new KeeperException.OperationTimeoutException();
}
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
@Override
public void close() {
for (Entry<String, TreeCache> each : caches.entrySet()) {
each.getValue().close();
}
waitForCacheClose();
CloseableUtils.closeQuietly(client);
}
/*
* TODO
* sleep 500ms, let cache client close first and then client, otherwise will throw exception
* reference:https://issues.apache.org/jira/browse/CURATOR-157
*/
private void waitForCacheClose() {
try {
Thread.sleep(500L);
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
@Override
public String get(final String key) {
TreeCache cache = findTreeCache(key);
if (null == cache) {
return getDirectly(key);
}
ChildData resultInCache = cache.getCurrentData(key);
if (null != resultInCache) {
return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8);
}
return getDirectly(key);
}
private TreeCache findTreeCache(final String key) {
for (Entry<String, TreeCache> entry : caches.entrySet()) {
if (key.startsWith(entry.getKey())) {
return entry.getValue();
}
}
return null;
}
@Override
public String getDirectly(final String key) {
try {
return new String(client.getData().forPath(key), Charsets.UTF_8);
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
return null;
}
}
@Override
public List<String> getChildrenKeys(final String key) {
try {
List<String> result = client.getChildren().forPath(key);
Collections.sort(result, new Comparator<String>() {
@Override
public int compare(final String o1, final String o2) {
return o2.compareTo(o1);
}
});
return result;
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
return Collections.emptyList();
}
}
@Override
public int getNumChildren(final String key) {
try {
Stat stat = client.checkExists().forPath(key);
if (null != stat) {
return stat.getNumChildren();
}
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
return 0;
}
@Override
public boolean isExisted(final String key) {
try {
return null != client.checkExists().forPath(key);
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
return false;
}
}
@Override
public void persist(final String key, final String value) {
try {
if (!isExisted(key)) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(Charsets.UTF_8));
} else {
update(key, value);
}
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
@Override
public void update(final String key, final String value) {
try {
client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
@Override
public void persistEphemeral(final String key, final String value) {
try {
if (isExisted(key)) {
client.delete().deletingChildrenIfNeeded().forPath(key);
}
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(Charsets.UTF_8));
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
@Override
public String persistSequential(final String key, final String value) {
try {
return client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(key, value.getBytes(Charsets.UTF_8));
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
return null;
}
@Override
public void persistEphemeralSequential(final String key) {
try {
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key);
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
@Override
public void remove(final String key) {
try {
client.delete().deletingChildrenIfNeeded().forPath(key);
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
}
@Override
public long getRegistryCenterTime(final String key) {
long result = 0L;
try {
persist(key, "");
result = client.checkExists().forPath(key).getMtime();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
Preconditions.checkState(0L != result, "Cannot get registry center time.");
return result;
}
@Override
public Object getRawClient() {
return client;
}
@Override
public void addCacheData(final String cachePath) {
TreeCache cache = new TreeCache(client, cachePath);
try {
cache.start();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
RegExceptionHandler.handleException(ex);
}
caches.put(cachePath + "/", cache);
}
@Override
public void evictCacheData(final String cachePath) {
TreeCache cache = caches.remove(cachePath + "/");
if (null != cache) {
cache.close();
}
}
@Override
public Object getRawCache(final String cachePath) {
return caches.get(cachePath + "/");
}
}