blob: 22b804eacbc55a75163d4e21e81aed5ea8ed771b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.sofa;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.RegistryNotifier;
import org.apache.dubbo.registry.support.CacheableFailbackRegistry;
import com.alipay.sofa.registry.client.api.RegistryClient;
import com.alipay.sofa.registry.client.api.RegistryClientConfig;
import com.alipay.sofa.registry.client.api.Subscriber;
import com.alipay.sofa.registry.client.api.SubscriberDataObserver;
import com.alipay.sofa.registry.client.api.model.RegistryType;
import com.alipay.sofa.registry.client.api.model.UserData;
import com.alipay.sofa.registry.client.api.registration.PublisherRegistration;
import com.alipay.sofa.registry.client.api.registration.SubscriberRegistration;
import com.alipay.sofa.registry.client.provider.DefaultRegistryClient;
import com.alipay.sofa.registry.client.provider.DefaultRegistryClientConfigBuilder;
import com.alipay.sofa.registry.core.model.ScopeEnum;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
import static org.apache.dubbo.registry.Constants.PROVIDER_PROTOCOL;
import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
import static org.apache.dubbo.registry.Constants.SUBSCRIBE_KEY;
import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.ADDRESS_WAIT_TIME_KEY;
import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.DEFAULT_GROUP;
import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.LOCAL_DATA_CENTER;
import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.LOCAL_REGION;
/**
* The Sofa registry.
*
* @since 2.7.2
*/
public class SofaRegistry extends CacheableFailbackRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(SofaRegistry.class);
/**
* Cache subscriber by dataId
*/
private final Map<String, Subscriber> subscribers = new ConcurrentHashMap<>();
/**
* Direct registry client
*/
private RegistryClient registryClient;
/**
* wait address from registry
*/
private int waitAddressTimeout;
/**
* Instantiates a new Sofa registry.
*
* @param url the url
*/
public SofaRegistry(URL url) {
super(url);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Build sofa registry by url:" + url);
}
this.registryClient = buildClient(url);
this.waitAddressTimeout = Integer.parseInt(ConfigUtils.getProperty(ADDRESS_WAIT_TIME_KEY, "5000"));
}
/**
* Build client registry client.
*
* @param url the url
* @return the registry client
*/
protected RegistryClient buildClient(URL url) {
RegistryClientConfig config = DefaultRegistryClientConfigBuilder.start()
.setDataCenter(LOCAL_DATA_CENTER)
.setZone(LOCAL_REGION)
.setRegistryEndpoint(url.getHost())
.setRegistryEndpointPort(url.getPort()).build();
DefaultRegistryClient registryClient = new DefaultRegistryClient(config);
registryClient.init();
return registryClient;
}
@Override
public boolean isAvailable() {
return true;
}
@Override
public void doRegister(URL url) {
if (!url.getParameter(REGISTER_KEY, true)
|| CONSUMER_PROTOCOL.equals(url.getProtocol())) {
return;
}
String serviceName = buildServiceName(url);
String serviceData = url.toFullString();
PublisherRegistration registration = new PublisherRegistration(serviceName);
addAttributesForPub(registration);
registryClient.register(registration, serviceData);
}
/**
* Add attributes for pub.
*
* @param publisherRegistration the publisher registration
*/
protected void addAttributesForPub(PublisherRegistration publisherRegistration) {
publisherRegistration.setGroup(DEFAULT_GROUP);
}
@Override
public void doUnregister(URL url) {
if (!url.getParameter(REGISTER_KEY, true)
|| CONSUMER_PROTOCOL.equals(url.getProtocol())) {
return;
}
String serviceName = buildServiceName(url);
registryClient.unregister(serviceName, DEFAULT_GROUP, RegistryType.PUBLISHER);
}
@Override
public void doSubscribe(URL url, final NotifyListener listener) {
if (!url.getParameter(SUBSCRIBE_KEY, true)
|| PROVIDER_PROTOCOL.equals(url.getProtocol())) {
return;
}
String serviceName = buildServiceName(url);
// com.alipay.test.TestService:1.0:group@dubbo
Subscriber listSubscriber = subscribers.get(serviceName);
if (listSubscriber != null) {
LOGGER.warn("Service name [" + serviceName + "] have bean registered in SOFARegistry.");
CountDownLatch countDownLatch = new CountDownLatch(1);
handleRegistryData(url, listSubscriber.peekData(), listener, countDownLatch);
waitAddress(serviceName, countDownLatch);
return;
}
final CountDownLatch latch = new CountDownLatch(1);
SubscriberRegistration subscriberRegistration = new SubscriberRegistration(serviceName,
new RegistryChildListenerImpl(url, serviceName, listener, latch));
addAttributesForSub(subscriberRegistration);
listSubscriber = registryClient.register(subscriberRegistration);
subscribers.put(serviceName, listSubscriber);
waitAddress(serviceName, latch);
}
private void waitAddress(String serviceName, CountDownLatch countDownLatch) {
try {
if (!countDownLatch.await(waitAddressTimeout, TimeUnit.MILLISECONDS)) {
LOGGER.warn("Subscribe data failed by dataId " + serviceName);
}
} catch (Exception e) {
LOGGER.error("Error when wait Address!", e);
}
}
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
if (!url.getParameter(SUBSCRIBE_KEY, true)
|| PROVIDER_PROTOCOL.equals(url.getProtocol())) {
return;
}
String serviceName = buildServiceName(url);
registryClient.unregister(serviceName, DEFAULT_GROUP, RegistryType.SUBSCRIBER);
}
private void handleRegistryData(URL subscribeUrl, UserData data, NotifyListener notifyListener,
CountDownLatch latch) {
try {
List<URL> urls = toUrlsWithEmpty(subscribeUrl, PROVIDERS_CATEGORY, flatUserData(data));
notifyListener.notify(urls);
} finally {
latch.countDown();
}
}
@Override
protected boolean isMatch(URL subscribeUrl, URL providerUrl) {
return UrlUtils.isMatch(subscribeUrl, providerUrl);
}
private String buildServiceName(URL url) {
// return url.getServiceKey();
StringBuilder buf = new StringBuilder();
buf.append(url.getServiceInterface());
String version = url.getParameter(VERSION_KEY);
if (StringUtils.isNotEmpty(version)) {
buf.append(":").append(version);
}
String group = url.getParameter(GROUP_KEY);
if (StringUtils.isNotEmpty(group)) {
buf.append(":").append(group);
}
buf.append("@").append(DUBBO);
return buf.toString();
}
/**
* Print address data.
*
* @param dataId the data id
* @param userData the user data
*/
protected void printAddressData(String dataId, UserData userData) {
List<String> datas;
if (userData == null) {
datas = new ArrayList<>(0);
} else {
datas = flatUserData(userData);
}
StringBuilder sb = new StringBuilder();
for (String provider : datas) {
sb.append(" >>> ").append(provider).append("\n");
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Receive updated RPC service addresses: service[" + dataId
+ "]\n .Available target addresses size [" + datas.size() + "]\n"
+ sb.toString());
}
}
/**
* Add attributes for sub.
*
* @param subscriberRegistration the subscriber registration
*/
protected void addAttributesForSub(SubscriberRegistration subscriberRegistration) {
subscriberRegistration.setGroup(DEFAULT_GROUP);
subscriberRegistration.setScopeEnum(ScopeEnum.global);
}
/**
* Flat user data list.
*
* @param userData the user data
* @return the list
*/
protected List<String> flatUserData(UserData userData) {
List<String> result = new ArrayList<>();
if (userData == null) {
return result;
}
Map<String, List<String>> zoneData = userData.getZoneData();
for (Map.Entry<String, List<String>> entry : zoneData.entrySet()) {
result.addAll(entry.getValue());
}
return result;
}
private class RegistryChildListenerImpl implements SubscriberDataObserver {
private RegistryNotifier notifier;
public RegistryChildListenerImpl(URL consumerUrl, String path, NotifyListener listener, CountDownLatch latch) {
notifier = new RegistryNotifier(SofaRegistry.this) {
@Override
protected void doNotify(Object rawAddresses) {
//record change
printAddressData(path, (UserData) rawAddresses);
handleRegistryData(consumerUrl, (UserData) rawAddresses, listener, latch);
}
};
}
@Override
public void handleData(String dataId, UserData data) {
notifier.notify(data);
}
}
}