blob: e966c37575148e1c57ad7cf8ebe39e654b6dd95d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dromara.soul.register.server.nacos;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.dromara.soul.common.enums.RpcTypeEnum;
import org.dromara.soul.common.utils.GsonUtils;
import org.dromara.soul.register.common.config.SoulRegisterCenterConfig;
import org.dromara.soul.register.common.dto.MetaDataRegisterDTO;
import org.dromara.soul.register.common.dto.URIRegisterDTO;
import org.dromara.soul.register.common.path.RegisterPathConstants;
import org.dromara.soul.register.server.api.SoulServerRegisterPublisher;
import org.dromara.soul.register.server.api.SoulServerRegisterRepository;
import org.dromara.soul.spi.Join;
import java.util.EnumSet;
import java.util.Properties;
import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
/**
* nacos register server.
*
* @author lw1243925457
*/
@Slf4j
@Join
public class NacosServerRegisterRepository implements SoulServerRegisterRepository {
private static final EnumSet<RpcTypeEnum> RPC_TYPE_SET = EnumSet.of(RpcTypeEnum.DUBBO, RpcTypeEnum.GRPC,
RpcTypeEnum.HTTP, RpcTypeEnum.SPRING_CLOUD, RpcTypeEnum.SOFA, RpcTypeEnum.TARS);
private String defaultGroup = "default_group";
private ConfigService configService;
private NamingService namingService;
private SoulServerRegisterPublisher publisher;
private final ConcurrentSkipListSet<String> metadataConfigCache = new ConcurrentSkipListSet<>();
private final ConcurrentMap<String, ConcurrentSkipListSet<String>> uriServiceCache = new ConcurrentHashMap<>();
@Override
public void close() {
publisher.close();
}
@SneakyThrows
@Override
public void init(final SoulServerRegisterPublisher publisher, final SoulRegisterCenterConfig config) {
this.publisher = publisher;
String serverAddr = config.getServerLists();
Properties properties = config.getProps();
Properties nacosProperties = new Properties();
nacosProperties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
String nameSpace = "nacosNameSpace";
nacosProperties.put(PropertyKeyConst.NAMESPACE, properties.getProperty(nameSpace));
this.configService = ConfigFactory.createConfigService(nacosProperties);
this.namingService = NamingFactory.createNamingService(nacosProperties);
subscribe();
}
private void subscribe() {
RPC_TYPE_SET.forEach(this::subscribeRpcTypeService);
}
@SneakyThrows
private void subscribeRpcTypeService(final RpcTypeEnum rpcType) {
final String serviceName = RegisterPathConstants.buildServiceInstancePath(rpcType.getName());
Map<String, List<URIRegisterDTO>> services = new HashMap<>();
List<Instance> healthyInstances = namingService.selectInstances(serviceName, true);
healthyInstances.forEach(healthyInstance -> {
String contextPath = healthyInstance.getMetadata().get("contextPath");
String serviceConfigName = RegisterPathConstants.buildServiceConfigPath(rpcType.getName(), contextPath);
subscribeMetadata(serviceConfigName);
metadataConfigCache.add(serviceConfigName);
String metadata = healthyInstance.getMetadata().get("uriMetadata");
URIRegisterDTO uriRegisterDTO = GsonUtils.getInstance().fromJson(metadata, URIRegisterDTO.class);
services.computeIfAbsent(contextPath, k -> new ArrayList<>()).add(uriRegisterDTO);
uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
});
for (List<URIRegisterDTO> uriRegisterDTOList: services.values()) {
if (rpcType.equals(RpcTypeEnum.HTTP) || rpcType.equals(RpcTypeEnum.TARS) || rpcType.equals(RpcTypeEnum.GRPC)) {
publishRegisterURI(uriRegisterDTOList);
}
}
log.info("subscribe uri : {}", serviceName);
namingService.subscribe(serviceName, event -> {
if (event instanceof NamingEvent) {
List<Instance> instances = ((NamingEvent) event).getInstances();
instances.forEach(instance -> {
String contextPath = instance.getMetadata().get("contextPath");
uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
});
refreshURIService(rpcType.getName(), serviceName);
}
});
}
@SneakyThrows
private void subscribeMetadata(final String serviceConfigName) {
registerMetadata(readData(serviceConfigName));
log.info("subscribe metadata: {}", serviceConfigName);
configService.addListener(serviceConfigName, defaultGroup, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
@Override
public void receiveConfigInfo(final String config) {
registerMetadata(config);
}
});
}
private void registerMetadata(final String metadataConfig) {
List<String> metadataList = GsonUtils.getInstance().fromList(metadataConfig, String.class);
metadataList.forEach(this::publishMetadata);
}
private void publishMetadata(final String data) {
log.info("publish metadata: {}", data);
publisher.publish(Lists.newArrayList(GsonUtils.getInstance().fromJson(data, MetaDataRegisterDTO.class)));
}
private void refreshURIService(final String rpcType, final String serviceName) {
for (String contextPath: uriServiceCache.get(serviceName)) {
registerURI(contextPath, serviceName, rpcType);
}
}
@SneakyThrows
private void registerURI(final String contextPath, final String serviceName, final String rpcType) {
List<Instance> healthyInstances = namingService.selectInstances(serviceName, true);
List<URIRegisterDTO> registerDTOList = new ArrayList<>();
healthyInstances.forEach(healthyInstance -> {
if (contextPath.equals(healthyInstance.getMetadata().get("contextPath"))) {
String metadata = healthyInstance.getMetadata().get("uriMetadata");
URIRegisterDTO uriRegisterDTO = GsonUtils.getInstance().fromJson(metadata, URIRegisterDTO.class);
registerDTOList.add(uriRegisterDTO);
String serviceConfigName = RegisterPathConstants.buildServiceConfigPath(rpcType, contextPath);
if (!metadataConfigCache.contains(serviceConfigName)) {
subscribeMetadata(serviceConfigName);
metadataConfigCache.add(serviceConfigName);
}
}
});
if (rpcType.equals(RpcTypeEnum.HTTP.getName()) || rpcType.equals(RpcTypeEnum.TARS.getName()) || rpcType.equals(RpcTypeEnum.GRPC.getName())) {
return;
}
if (registerDTOList.isEmpty()) {
URIRegisterDTO uriRegisterDTO = URIRegisterDTO.builder().contextPath("/" + contextPath).build();
registerDTOList.add(uriRegisterDTO);
}
publishRegisterURI(registerDTOList);
}
private void publishRegisterURI(final List<URIRegisterDTO> registerDTOList) {
log.info("publish uri: {}", registerDTOList);
publisher.publish(registerDTOList);
}
@SneakyThrows
private String readData(final String configName) {
return configService.getConfig(configName, defaultGroup, 5000);
}
}