blob: 889e2cbfe4f2c6afe9194d3bbd4377f3e226c098 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shenyu.sync.data.etcd;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.shenyu.common.constant.DefaultPathConstants;
import org.apache.shenyu.common.dto.AppAuthData;
import org.apache.shenyu.common.dto.MetaData;
import org.apache.shenyu.common.dto.PluginData;
import org.apache.shenyu.common.dto.RuleData;
import org.apache.shenyu.common.dto.SelectorData;
import org.apache.shenyu.common.enums.ConfigGroupEnum;
import org.apache.shenyu.common.utils.GsonUtils;
import org.apache.shenyu.sync.data.api.AuthDataSubscriber;
import org.apache.shenyu.sync.data.api.MetaDataSubscriber;
import org.apache.shenyu.sync.data.api.PluginDataSubscriber;
import org.apache.shenyu.sync.data.api.SyncDataService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
/**
* Data synchronize of etcd.
*/
public class EtcdSyncDataService implements SyncDataService {
/**
* logger.
*/
private static final Logger LOG = LoggerFactory.getLogger(EtcdSyncDataService.class);
private static final String PRE_FIX = "/shenyu";
private final EtcdClient etcdClient;
private final PluginDataSubscriber pluginDataSubscriber;
private final List<MetaDataSubscriber> metaDataSubscribers;
private final List<AuthDataSubscriber> authDataSubscribers;
private Map<String, String> keysMap = new ConcurrentHashMap<>();
/**
* Instantiates a new Zookeeper cache manager.
*
* @param etcdClient the etcd client
* @param pluginDataSubscriber the plugin data subscriber
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
public EtcdSyncDataService(final EtcdClient etcdClient,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
this.etcdClient = etcdClient;
this.pluginDataSubscriber = pluginDataSubscriber;
this.metaDataSubscribers = metaDataSubscribers;
this.authDataSubscribers = authDataSubscribers;
watchAllKeys();
watcherData();
watchAppAuth();
watchMetaData();
}
private void watchAllKeys() {
keysMap = etcdClient.getKeysMapByPrefix(PRE_FIX);
etcdClient.watchDataChange(PRE_FIX, (updateKey, updateValue) -> {
keysMap.put(updateKey, updateValue);
}, deleteKey -> {
keysMap.remove(deleteKey);
});
}
private void watcherData() {
final String pluginParent = DefaultPathConstants.PLUGIN_PARENT;
List<String> pluginChildren = etcdClientGetChildrenByMap(pluginParent, keysMap);
for (String pluginName : pluginChildren) {
watcherAll(pluginName);
}
etcdClient.watchChildChange(pluginParent, (updateNode, updateValue) -> {
if (!updateNode.isEmpty()) {
watcherAll(updateNode);
}
}, null);
}
private void watcherAll(final String pluginName) {
watcherPlugin(pluginName);
watcherSelector(pluginName);
watcherRule(pluginName);
}
private void watcherPlugin(final String pluginName) {
String pluginPath = DefaultPathConstants.buildPluginPath(pluginName);
cachePluginData(keysMap.get(pluginPath));
subscribePluginDataChanges(pluginPath, pluginName);
}
private void watcherSelector(final String pluginName) {
String selectorParentPath = DefaultPathConstants.buildSelectorParentPath(pluginName);
List<String> childrenList = etcdClientGetChildrenByMap(selectorParentPath, keysMap);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(selectorParentPath, children);
cacheSelectorData(keysMap.get(realPath));
subscribeSelectorDataChanges(realPath);
});
}
subscribeChildChanges(ConfigGroupEnum.SELECTOR, selectorParentPath);
}
private void watcherRule(final String pluginName) {
String ruleParent = DefaultPathConstants.buildRuleParentPath(pluginName);
List<String> childrenList = etcdClientGetChildrenByMap(ruleParent, keysMap);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(ruleParent, children);
cacheRuleData(keysMap.get(realPath));
subscribeRuleDataChanges(realPath);
});
}
subscribeChildChanges(ConfigGroupEnum.RULE, ruleParent);
}
private void watchAppAuth() {
final String appAuthParent = DefaultPathConstants.APP_AUTH_PARENT;
List<String> childrenList = etcdClientGetChildrenByMap(appAuthParent, keysMap);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(appAuthParent, children);
cacheAuthData(keysMap.get(realPath));
subscribeAppAuthDataChanges(realPath);
});
}
subscribeChildChanges(ConfigGroupEnum.APP_AUTH, appAuthParent);
}
private void watchMetaData() {
final String metaDataPath = DefaultPathConstants.META_DATA;
List<String> childrenList = etcdClientGetChildrenByMap(metaDataPath, keysMap);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(metaDataPath, children);
cacheMetaData(keysMap.get(realPath));
subscribeMetaDataChanges(realPath);
});
}
subscribeChildChanges(ConfigGroupEnum.META_DATA, metaDataPath);
}
private void subscribeChildChanges(final ConfigGroupEnum groupKey, final String groupParentPath) {
switch (groupKey) {
case SELECTOR:
etcdClient.watchChildChange(groupParentPath, (updatePath, updateValue) -> {
cacheSelectorData(keysMap.get(updatePath));
subscribeSelectorDataChanges(updatePath);
}, null);
break;
case RULE:
etcdClient.watchChildChange(groupParentPath, (updatePath, updateValue) -> {
cacheRuleData(keysMap.get(updatePath));
subscribeRuleDataChanges(updatePath);
}, null);
break;
case APP_AUTH:
etcdClient.watchChildChange(groupParentPath, (updatePath, updateValue) -> {
cacheAuthData(keysMap.get(updatePath));
subscribeAppAuthDataChanges(updatePath);
}, null);
break;
case META_DATA:
etcdClient.watchChildChange(groupParentPath, (updatePath, updateValue) -> {
cacheMetaData(keysMap.get(updatePath));
subscribeMetaDataChanges(updatePath);
}, null);
break;
default:
throw new IllegalStateException("Unexpected groupKey: " + groupKey);
}
}
private void subscribePluginDataChanges(final String pluginPath, final String pluginName) {
etcdClient.watchDataChange(pluginPath, (updatePath, updateValue) -> {
final String dataPath = buildRealPath(pluginPath, updatePath);
final String dataStr = keysMap.get(dataPath);
final PluginData data = GsonUtils.getInstance().fromJson(dataStr, PluginData.class);
Optional.ofNullable(data)
.ifPresent(d -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSubscribe(d)));
}, deleteNode -> deletePlugin(pluginName));
}
private void deletePlugin(final String pluginName) {
final PluginData data = new PluginData();
data.setName(pluginName);
Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.unSubscribe(data));
}
private void subscribeSelectorDataChanges(final String path) {
etcdClient.watchDataChange(path, (updateNode, updateValue) -> cacheSelectorData(updateValue),
this::unCacheSelectorData);
}
private void subscribeRuleDataChanges(final String path) {
etcdClient.watchDataChange(path, (updatePath, updateValue) -> cacheRuleData(updateValue),
this::unCacheRuleData);
}
private void subscribeAppAuthDataChanges(final String realPath) {
etcdClient.watchDataChange(realPath, (updatePath, updateValue) -> cacheAuthData(updateValue),
this::unCacheAuthData);
}
private void subscribeMetaDataChanges(final String realPath) {
etcdClient.watchDataChange(realPath, (updatePath, updateValue) -> cacheMetaData(updateValue),
this::deleteMetaData);
}
private void deleteMetaData(final String deletePath) {
final String path = deletePath.substring(DefaultPathConstants.META_DATA.length() + 1);
MetaData metaData = new MetaData();
try {
metaData.setPath(URLDecoder.decode(path, StandardCharsets.UTF_8.name()));
unCacheMetaData(metaData);
etcdClient.watchClose(path);
} catch (UnsupportedEncodingException e) {
LOG.error("delete meta data error.", e);
}
}
private void cachePluginData(final String dataString) {
final PluginData pluginData = GsonUtils.getInstance().fromJson(dataString, PluginData.class);
Optional.ofNullable(pluginData)
.flatMap(data -> Optional.ofNullable(pluginDataSubscriber)).ifPresent(e -> e.onSubscribe(pluginData));
}
private void cacheSelectorData(final String dataString) {
final SelectorData selectorData = GsonUtils.getInstance().fromJson(dataString, SelectorData.class);
Optional.ofNullable(selectorData)
.ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSelectorSubscribe(data)));
}
private void unCacheSelectorData(final String dataPath) {
SelectorData selectorData = new SelectorData();
final String selectorId = dataPath.substring(dataPath.lastIndexOf("/") + 1);
final String str = dataPath.substring(DefaultPathConstants.SELECTOR_PARENT.length());
final String pluginName = str.substring(1, str.length() - selectorId.length() - 1);
selectorData.setPluginName(pluginName);
selectorData.setId(selectorId);
Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.unSelectorSubscribe(selectorData));
etcdClient.watchClose(dataPath);
}
private void cacheRuleData(final String dataString) {
final RuleData ruleData = GsonUtils.getInstance().fromJson(dataString, RuleData.class);
Optional.ofNullable(ruleData)
.ifPresent(data -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onRuleSubscribe(data)));
}
private void unCacheRuleData(final String dataPath) {
String substring = dataPath.substring(dataPath.lastIndexOf("/") + 1);
final String str = dataPath.substring(DefaultPathConstants.RULE_PARENT.length());
final String pluginName = str.substring(1, str.length() - substring.length() - 1);
final List<String> list = Lists.newArrayList(Splitter.on(DefaultPathConstants.SELECTOR_JOIN_RULE).split(substring));
RuleData ruleData = new RuleData();
ruleData.setPluginName(pluginName);
ruleData.setSelectorId(list.get(0));
ruleData.setId(list.get(1));
Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.unRuleSubscribe(ruleData));
etcdClient.watchClose(dataPath);
}
private void cacheAuthData(final String dataString) {
final AppAuthData appAuthData = GsonUtils.getInstance().fromJson(dataString, AppAuthData.class);
Optional.ofNullable(appAuthData)
.ifPresent(data -> authDataSubscribers.forEach(e -> e.onSubscribe(data)));
}
private void unCacheAuthData(final String dataPath) {
final String key = dataPath.substring(DefaultPathConstants.APP_AUTH_PARENT.length() + 1);
AppAuthData appAuthData = new AppAuthData();
appAuthData.setAppKey(key);
authDataSubscribers.forEach(e -> e.unSubscribe(appAuthData));
etcdClient.watchClose(dataPath);
}
private void cacheMetaData(final String dataString) {
final MetaData metaData = GsonUtils.getInstance().fromJson(dataString, MetaData.class);
Optional.ofNullable(metaData)
.ifPresent(data -> metaDataSubscribers.forEach(e -> e.onSubscribe(metaData)));
}
private void unCacheMetaData(final MetaData metaData) {
Optional.ofNullable(metaData)
.ifPresent(data -> metaDataSubscribers.forEach(e -> e.unSubscribe(metaData)));
}
private String buildRealPath(final String parent, final String children) {
return String.join("/", parent, children);
}
private List<String> etcdClientGetChildren(final String parent) {
try {
return etcdClient.getChildrenKeys(parent, "/");
} catch (ExecutionException | InterruptedException e) {
LOG.error(e.getMessage(), e);
}
return Collections.emptyList();
}
private List<String> etcdClientGetChildrenByMap(final String parent, final Map<String, String> map) {
return etcdClient.getChildrenKeysByMap(parent, "/", map);
}
@Override
public void close() {
if (Objects.nonNull(etcdClient)) {
etcdClient.close();
}
}
}