blob: 8b021e9ce65ff337d6d4baf917fedd747eb38a43 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.polaris;
import com.tencent.polaris.api.listener.ServiceListener;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.common.registry.Consts;
import com.tencent.polaris.common.registry.PolarisOperator;
import com.tencent.polaris.common.utils.ExtensionConsts;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.constants.RegistryConstants;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.polaris.task.FetchTask;
import org.apache.dubbo.registry.polaris.task.InstancesHandler;
import org.apache.dubbo.registry.polaris.task.TaskScheduler;
import org.apache.dubbo.registry.polaris.task.WatchTask;
import org.apache.dubbo.registry.support.FailbackRegistry;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.cluster.Constants;
import org.apache.dubbo.rpc.cluster.RouterFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
public class PolarisRegistry extends FailbackRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(PolarisRegistry.class);
private static final TaskScheduler taskScheduler = new TaskScheduler();
private final Set<URL> registeredInstances = new ConcurrentHashSet<>();
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final Map<NotifyListener, ServiceListener> serviceListeners = new ConcurrentHashMap<>();
private final PolarisOperator polarisOperator;
private final boolean hasCircuitBreaker;
private final boolean hasRouter;
public PolarisRegistry(URL url) {
super(url);
polarisOperator = PolarisRegistryUtils.getOrCreatePolarisOperator(url);
ExtensionLoader<RouterFactory> routerExtensionLoader = ExtensionLoader.getExtensionLoader(RouterFactory.class);
hasRouter = routerExtensionLoader.hasExtension(ExtensionConsts.PLUGIN_ROUTER_NAME);
ExtensionLoader<Filter> filterExtensionLoader = ExtensionLoader.getExtensionLoader(Filter.class);
hasCircuitBreaker = filterExtensionLoader.hasExtension(ExtensionConsts.PLUGIN_CIRCUITBREAKER_NAME);
}
private URL buildRouterURL(URL consumerUrl) {
URL routerURL = null;
if (hasRouter) {
URL registryURL = getUrl();
routerURL = new URL(RegistryConstants.ROUTE_PROTOCOL, registryURL.getHost(), registryURL.getPort());
routerURL = routerURL.setServiceInterface(CommonConstants.ANY_VALUE);
routerURL = routerURL.addParameter(Constants.ROUTER_KEY, ExtensionConsts.PLUGIN_ROUTER_NAME);
String consumerGroup = consumerUrl.getParameter(CommonConstants.GROUP_KEY);
String consumerVersion = consumerUrl.getParameter(CommonConstants.VERSION_KEY);
String consumerClassifier = consumerUrl.getParameter(CommonConstants.CLASSIFIER_KEY);
if (null != consumerGroup) {
routerURL = routerURL.addParameter(CommonConstants.GROUP_KEY, consumerGroup);
}
if (null != consumerVersion) {
routerURL = routerURL.addParameter(CommonConstants.VERSION_KEY, consumerVersion);
}
if (null != consumerClassifier) {
routerURL = routerURL.addParameter(CommonConstants.CLASSIFIER_KEY, consumerClassifier);
}
}
return routerURL;
}
@Override
public void doRegister(URL url) {
if (!shouldRegister(url)) {
return;
}
LOGGER.info(String.format("[POLARIS] register service to polaris: %s", url.toString()));
Map<String, String> metadata = new HashMap<>(url.getParameters());
metadata.put(CommonConstants.PATH_KEY, url.getPath());
int port = url.getPort();
if (port > 0) {
int weight = url.getParameter(Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
String version = url.getParameter(CommonConstants.VERSION_KEY, "");
polarisOperator.register(url.getServiceInterface(), url.getHost(), port, url.getProtocol(), version, weight,
metadata);
registeredInstances.add(url);
} else {
LOGGER.warn(String.format("[POLARIS] skip register url %s for zero port value", url));
}
}
private boolean shouldRegister(URL url) {
return !StringUtils.equals(url.getProtocol(), CommonConstants.CONSUMER);
}
@Override
public void doUnregister(URL url) {
if (!shouldRegister(url)) {
return;
}
LOGGER.info(String.format("[POLARIS] unregister service from polaris: %s", url.toString()));
int port = url.getPort();
if (port > 0) {
polarisOperator.deregister(url.getServiceInterface(), url.getHost(), url.getPort());
registeredInstances.remove(url);
}
}
@Override
public void destroy() {
if (destroyed.compareAndSet(false, true)) {
super.destroy();
Collection<URL> urls = Collections.unmodifiableCollection(registeredInstances);
for (URL url : urls) {
doUnregister(url);
}
PolarisRegistryUtils.removePolarisOperator(getUrl());
polarisOperator.destroy();
taskScheduler.destroy();
}
}
@Override
public void doSubscribe(URL url, NotifyListener listener) {
String service = url.getServiceInterface();
Instance[] instances = polarisOperator.getAvailableInstances(service, !hasCircuitBreaker);
onInstances(url, listener, instances);
LOGGER.info(String.format("[POLARIS] submit watch task for service %s", service));
PolarisInstancesHandler polarisInstancesHandler = new PolarisInstancesHandler(url, listener);
FetchTask fetchTask = new FetchTask(
url.getServiceInterface(), polarisInstancesHandler, polarisOperator, !hasCircuitBreaker);
taskScheduler.submitWatchTask(new WatchTask(url.getServiceInterface(), fetchTask, taskScheduler));
}
private void onInstances(URL url, NotifyListener listener, Instance[] instances) {
LOGGER.info(String.format("[POLARIS] update instances count: %d, service: %s", null == instances ? 0 : instances.length,
url.getServiceInterface()));
List<URL> urls = new ArrayList<>();
if (null != instances) {
for (Instance instance : instances) {
urls.add(instanceToURL(instance));
}
}
URL routerURL = buildRouterURL(url);
if (null != routerURL) {
urls.add(routerURL);
}
PolarisRegistry.this.notify(url, listener, urls);
}
private static URL instanceToURL(Instance instance) {
Map<String, String> newMetadata = new HashMap<>(instance.getMetadata());
boolean hasWeight = false;
if (newMetadata.containsKey(Constants.WEIGHT_KEY)) {
String weightStr = newMetadata.get(Constants.WEIGHT_KEY);
try {
int weightValue = Integer.parseInt(weightStr);
if (weightValue == instance.getWeight()) {
hasWeight = true;
}
} catch (Exception ignored) {
}
}
if (!hasWeight) {
newMetadata.put(Constants.WEIGHT_KEY, Integer.toString(instance.getWeight()));
}
newMetadata.put(Consts.INSTANCE_KEY_ID, instance.getId());
newMetadata.put(Consts.INSTANCE_KEY_HEALTHY, Boolean.toString(instance.isHealthy()));
newMetadata.put(Consts.INSTANCE_KEY_ISOLATED, Boolean.toString(instance.isIsolated()));
clearEmptyKeys(newMetadata, new String[]{CommonConstants.VERSION_KEY, CommonConstants.GROUP_KEY});
return new URL(instance.getProtocol(),
instance.getHost(),
instance.getPort(),
newMetadata.get(CommonConstants.PATH_KEY),
newMetadata);
}
private static void clearEmptyKeys(Map<String, String> parameters, String[] keys) {
for (String key : keys) {
String value = parameters.get(key);
if (null != value && StringUtils.isBlank(value)) {
parameters.remove(key);
}
}
}
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
LOGGER.info(String.format("[polaris] unsubscribe service: %s", url.toString()));
taskScheduler.submitWatchTask(new Runnable() {
@Override
public void run() {
ServiceListener serviceListener = serviceListeners.remove(listener);
if (null != serviceListener) {
polarisOperator.unwatchService(url.getServiceInterface(), serviceListener);
}
}
});
}
public PolarisOperator getPolarisOperator() {
return polarisOperator;
}
@Override
public boolean isAvailable() {
return true;
}
private class PolarisInstancesHandler implements InstancesHandler {
private final URL url;
private final NotifyListener listener;
public PolarisInstancesHandler(URL url, NotifyListener listener) {
this.url = url;
this.listener = listener;
}
@Override
public void onInstances(String serviceName, Instance[] instances) {
PolarisRegistry.this.onInstances(url, listener, instances);
}
@Override
public void onWatchSuccess(String serviceName, ServiceListener serviceListener) {
serviceListeners.put(listener, serviceListener);
}
}
}