blob: 528553c1e3c1137e2d26b6e999d95406423c2c56 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.config.nacos.archaius.sources;
import static com.netflix.config.WatchedUpdateResult.createIncremental;
import static org.apache.servicecomb.config.nacos.client.ConfigurationAction.CREATE;
import static org.apache.servicecomb.config.nacos.client.ConfigurationAction.DELETE;
import static org.apache.servicecomb.config.nacos.client.ConfigurationAction.SET;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.configuration.Configuration;
import org.apache.servicecomb.config.ConfigMapping;
import org.apache.servicecomb.config.nacos.client.ConfigurationAction;
import org.apache.servicecomb.config.nacos.client.NacosClient;
import org.apache.servicecomb.config.nacos.client.NacosConfig;
import org.apache.servicecomb.config.spi.ConfigCenterConfigurationSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
import com.netflix.config.WatchedUpdateListener;
import com.netflix.config.WatchedUpdateResult;
public class NacosConfigurationSourceImpl implements ConfigCenterConfigurationSource {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigCenterConfigurationSource.class);
private final Map<String, Object> valueCache = new ConcurrentHashMap<>();
private final List<WatchedUpdateListener> listeners = new CopyOnWriteArrayList<>();
public NacosConfigurationSourceImpl() {
}
private final UpdateHandler updateHandler = new UpdateHandler();
@VisibleForTesting
UpdateHandler getUpdateHandler() {
return updateHandler;
}
@Override
public int getOrder() {
return ORDER_BASE * 4;
}
@Override
public boolean isValidSource(Configuration localConfiguration) {
if (localConfiguration.getProperty(NacosConfig.SERVER_ADDR) == null) {
LOGGER.warn("Nacos configuration source is not configured!");
return false;
}
return true;
}
@Override
public void init(Configuration localConfiguration) {
NacosConfig.setConcurrentCompositeConfiguration(localConfiguration);
init();
}
private void init() {
NacosClient nacosClient = new NacosClient(updateHandler);
nacosClient.refreshNacosConfig();
}
@Override
public void addUpdateListener(WatchedUpdateListener watchedUpdateListener) {
listeners.add(watchedUpdateListener);
}
@Override
public void removeUpdateListener(WatchedUpdateListener watchedUpdateListener) {
listeners.remove(watchedUpdateListener);
}
private void updateConfiguration(WatchedUpdateResult result) {
for (WatchedUpdateListener l : listeners) {
try {
l.updateConfiguration(result);
} catch (Throwable ex) {
LOGGER.error("Error in invoking WatchedUpdateListener", ex);
}
}
}
@Override
public Map<String, Object> getCurrentData() throws Exception {
return valueCache;
}
public List<WatchedUpdateListener> getCurrentListeners() {
return listeners;
}
public class UpdateHandler {
public void handle(ConfigurationAction action, Map<String, Object> config) {
if (config == null || config.isEmpty()) {
return;
}
Map<String, Object> configuration = ConfigMapping.getConvertedMap(config);
if (CREATE.equals(action)) {
valueCache.putAll(configuration);
updateConfiguration(createIncremental(ImmutableMap.copyOf(configuration),
null,
null));
} else if (SET.equals(action)) {
valueCache.putAll(configuration);
updateConfiguration(createIncremental(null,
ImmutableMap.copyOf(configuration),
null));
} else if (DELETE.equals(action)) {
configuration.keySet().forEach(valueCache::remove);
updateConfiguration(createIncremental(null,
null,
ImmutableMap.copyOf(configuration)));
} else {
LOGGER.error("action: {} is invalid.", action.name());
return;
}
LOGGER.warn("Config value cache changed: action:{}; item:{}", action.name(), configuration.keySet());
}
}
}