blob: 86a7dc37aeb4ccf04a07a0d451037c91325e5e50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.sofa;
import com.alipay.sofa.registry.client.api.Publisher;
import com.alipay.sofa.registry.client.api.RegistryClientConfig;
import com.alipay.sofa.registry.client.api.Subscriber;
import com.alipay.sofa.registry.client.api.model.RegistryType;
import com.alipay.sofa.registry.client.api.model.UserData;
import com.alipay.sofa.registry.client.api.registration.PublisherRegistration;
import com.alipay.sofa.registry.client.api.registration.SubscriberRegistration;
import com.alipay.sofa.registry.client.provider.DefaultRegistryClient;
import com.alipay.sofa.registry.client.provider.DefaultRegistryClientConfigBuilder;
import com.alipay.sofa.registry.core.model.ScopeEnum;
import com.google.gson.Gson;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.DefaultPage;
import org.apache.dubbo.common.utils.Page;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.rpc.RpcException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.ADDRESS_WAIT_TIME_KEY;
import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.LOCAL_DATA_CENTER;
import static org.apache.dubbo.registry.sofa.SofaRegistryConstants.LOCAL_REGION;
public class SofaRegistryServiceDiscovery implements ServiceDiscovery {
private static final Logger LOGGER = LoggerFactory.getLogger(SofaRegistryServiceDiscovery.class);
private static final String DEFAULT_GROUP = "dubbo";
private URL registryURL;
private DefaultRegistryClient registryClient;
private int waitAddressTimeout;
private RegistryClientConfig registryClientConfig;
private final Map<String, Publisher> publishers = new ConcurrentHashMap<>();
private final Map<String, Subscriber> subscribers = new ConcurrentHashMap<>();
private ServiceInstance serviceInstance;
private Gson gson = new Gson();
@Override
public void initialize(URL registryURL) throws Exception {
this.registryURL = registryURL;
this.registryClientConfig = DefaultRegistryClientConfigBuilder.start()
.setDataCenter(LOCAL_DATA_CENTER)
.setZone(LOCAL_REGION)
.setRegistryEndpoint(registryURL.getHost())
.setRegistryEndpointPort(registryURL.getPort()).build();
registryClient = new DefaultRegistryClient(this.registryClientConfig);
registryClient.init();
this.waitAddressTimeout = Integer.parseInt(ConfigUtils.getProperty(ADDRESS_WAIT_TIME_KEY, "5000"));
}
@Override
public URL getUrl() {
return registryURL;
}
@Override
public void destroy() throws Exception {
}
@Override
public void register(ServiceInstance serviceInstance) throws RuntimeException {
SofaRegistryInstance sofaRegistryInstance = new SofaRegistryInstance(serviceInstance.getId(), serviceInstance.getHost(), serviceInstance.getPort(), serviceInstance.getServiceName(), serviceInstance.getMetadata());
Publisher publisher = publishers.get(serviceInstance.getServiceName());
this.serviceInstance = serviceInstance;
if (null == publisher) {
PublisherRegistration registration = new PublisherRegistration(serviceInstance.getServiceName());
registration.setGroup(DEFAULT_GROUP);
publisher = registryClient.register(registration, gson.toJson(sofaRegistryInstance));
publishers.put(serviceInstance.getServiceName(), publisher);
} else {
publisher.republish(gson.toJson(sofaRegistryInstance));
}
}
@Override
public void update(ServiceInstance serviceInstance) throws RuntimeException {
register(serviceInstance);
}
@Override
public void unregister(ServiceInstance serviceInstance) throws RuntimeException {
registryClient.unregister(serviceInstance.getServiceName(), DEFAULT_GROUP, RegistryType.PUBLISHER);
}
@Override
public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
listener.getServiceNames().forEach(serviceName -> registerServiceWatcher(serviceName, listener));
}
protected void registerServiceWatcher(String serviceName, ServiceInstancesChangedListener listener) {
Subscriber subscriber = subscribers.get(serviceName);
if (null == subscriber) {
final CountDownLatch latch = new CountDownLatch(1);
SubscriberRegistration subscriberRegistration = new SubscriberRegistration(serviceName, (dataId, data) -> {
handleRegistryData(dataId, data, listener, latch);
});
subscriberRegistration.setGroup(DEFAULT_GROUP);
subscriberRegistration.setScopeEnum(ScopeEnum.global);
subscriber = registryClient.register(subscriberRegistration);
subscribers.put(serviceName, subscriber);
waitAddress(serviceName, latch);
}
}
@Override
public Page<ServiceInstance> getInstances(String serviceName, int offset, int pageSize, boolean healthyOnly)
throws NullPointerException, IllegalArgumentException, UnsupportedOperationException {
Subscriber subscriber = subscribers.get(serviceName);
if (null != subscriber) {
List<ServiceInstance> serviceInstanceList = handleRegistryData(serviceName, subscriber.peekData(), null, null);
return new DefaultPage<>(offset, pageSize, serviceInstanceList, serviceInstanceList.size());
}
throw new RpcException("getInstances error!");
}
private List<ServiceInstance> handleRegistryData(String dataId, UserData userData, ServiceInstancesChangedListener listener, CountDownLatch latch) {
try {
List<String> datas = getUserData(dataId, userData);
List<ServiceInstance> serviceInstances = new ArrayList<>(datas.size());
for (String serviceData : datas) {
SofaRegistryInstance sri = gson.fromJson(serviceData, SofaRegistryInstance.class);
DefaultServiceInstance serviceInstance = new DefaultServiceInstance(sri.getId(), dataId, sri.getHost(), sri.getPort());
serviceInstance.setMetadata(sri.getMetadata());
serviceInstances.add(serviceInstance);
}
if (null != listener) {
listener.onEvent(new ServiceInstancesChangedEvent(dataId, serviceInstances));
}
return serviceInstances;
} finally {
if (null != latch) {
latch.countDown();
}
}
}
private void waitAddress(String serviceName, CountDownLatch countDownLatch) {
try {
if (!countDownLatch.await(waitAddressTimeout, TimeUnit.MILLISECONDS)) {
LOGGER.warn("Subscribe data failed by dataId " + serviceName);
}
} catch (Exception e) {
LOGGER.error("Error when wait Address!", e);
}
}
/**
* Print address data.
*
* @param dataId the data id
* @param userData the user data
*/
protected List<String> getUserData(String dataId, UserData userData) {
List<String> datas = null;
if (userData == null) {
datas = new ArrayList<>(0);
} else {
datas = flatUserData(userData);
}
StringBuilder sb = new StringBuilder();
for (String provider : datas) {
sb.append(" >>> ").append(provider).append("\n");
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Receive updated RPC service addresses: service[" + dataId
+ "]\n .Available target addresses size [" + datas.size() + "]\n"
+ sb.toString());
}
return datas;
}
/**
* Flat user data list.
*
* @param userData the user data
* @return the list
*/
protected List<String> flatUserData(UserData userData) {
List<String> result = new ArrayList<>();
Map<String, List<String>> zoneData = userData.getZoneData();
for (Map.Entry<String, List<String>> entry : zoneData.entrySet()) {
result.addAll(entry.getValue());
}
return result;
}
@Override
public ServiceInstance getLocalInstance() {
return serviceInstance;
}
/**
* @TODO 后续确认下
* @return
*/
@Override
public Set<String> getServices() {
return subscribers.keySet();
}
}