blob: 6896ed5347938bb4c2de65f80704fb5ef7d2e7ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.serviceregistry.registry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.commons.configuration.Configuration;
import org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper;
import org.apache.servicecomb.registry.DiscoveryManager;
import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
import org.apache.servicecomb.serviceregistry.event.ShutdownEvent;
import org.apache.servicecomb.registry.api.registry.BasePath;
import org.apache.servicecomb.registry.api.registry.Microservice;
import org.apache.servicecomb.registry.api.registry.MicroserviceFactory;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstance;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstances;
import org.apache.servicecomb.serviceregistry.RegistryUtils;
import org.apache.servicecomb.serviceregistry.ServiceRegistry;
import org.apache.servicecomb.serviceregistry.api.Const;
import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient;
import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache;
import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheKey;
import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheRefreshedEvent;
import org.apache.servicecomb.serviceregistry.registry.cache.RefreshableServiceRegistryCache;
import org.apache.servicecomb.serviceregistry.registry.cache.ServiceRegistryCache;
import org.apache.servicecomb.serviceregistry.task.MicroserviceServiceCenterTask;
import org.apache.servicecomb.serviceregistry.task.ServiceCenterTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.MoreExecutors;
public abstract class AbstractServiceRegistry implements ServiceRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractServiceRegistry.class);
private final MicroserviceFactory microserviceFactory = new MicroserviceFactory();
protected EventBus eventBus;
protected Microservice microservice;
protected ServiceRegistryClient srClient;
protected ServiceRegistryConfig serviceRegistryConfig;
protected ServiceCenterTask serviceCenterTask;
protected ExecutorService executorService = MoreExecutors.newDirectExecutorService();
private String name;
RefreshableServiceRegistryCache serviceRegistryCache;
public AbstractServiceRegistry(EventBus eventBus, ServiceRegistryConfig serviceRegistryConfig,
Configuration configuration) {
setName(serviceRegistryConfig.getRegistryName());
this.eventBus = eventBus;
this.serviceRegistryConfig = serviceRegistryConfig;
this.microservice = microserviceFactory.create(configuration);
}
@Override
public void init() {
if (srClient == null) {
srClient = createServiceRegistryClient();
eventBus.register(srClient);
}
createServiceCenterTask();
eventBus.register(this);
initCache();
}
private void initCache() {
serviceRegistryCache = new RefreshableServiceRegistryCache(microservice, srClient);
serviceRegistryCache.setCacheRefreshedWatcher(
caches -> eventBus.post(new MicroserviceCacheRefreshedEvent(caches)));
}
@Override
public EventBus getEventBus() {
return eventBus;
}
@Override
public ServiceRegistryClient getServiceRegistryClient() {
return srClient;
}
public void setServiceRegistryClient(ServiceRegistryClient serviceRegistryClient) {
this.srClient = serviceRegistryClient;
}
@Override
public String getAppId() {
return microservice.getAppId();
}
protected abstract ServiceRegistryClient createServiceRegistryClient();
@Override
public void run() {
loadStaticConfiguration();
// try register
// if failed, then retry in thread
serviceCenterTask.init();
}
private void loadStaticConfiguration() {
// TODO 如果yaml定义了paths规则属性,替换默认值,现需要DynamicPropertyFactory支持数组获取
List<BasePath> paths = microservice.getPaths();
for (BasePath path : paths) {
if (path.getProperty() == null) {
path.setProperty(new HashMap<>());
}
path.getProperty().put(Const.PATH_CHECKSESSION, "false");
}
}
private void createServiceCenterTask() {
MicroserviceServiceCenterTask task =
new MicroserviceServiceCenterTask(eventBus, serviceRegistryConfig, srClient, microservice);
serviceCenterTask = new ServiceCenterTask(eventBus, serviceRegistryConfig.getHeartbeatInterval(),
task);
}
public boolean unregisterInstance() {
MicroserviceInstance microserviceInstance = microservice.getInstance();
if (microserviceInstance.getInstanceId() == null || microserviceInstance.getServiceId() == null) {
return true;
}
boolean result = srClient.unregisterMicroserviceInstance(microserviceInstance.getServiceId(),
microserviceInstance.getInstanceId());
if (!result) {
LOGGER.error("Unregister microservice instance failed. microserviceId={} instanceId={}",
microserviceInstance.getServiceId(),
microserviceInstance.getInstanceId());
return false;
}
LOGGER.info("Unregister microservice instance success. microserviceId={} instanceId={}",
microserviceInstance.getServiceId(),
microserviceInstance.getInstanceId());
return true;
}
@Override
public List<MicroserviceInstance> findServiceInstance(String appId, String serviceName,
String versionRule) {
MicroserviceInstances instances = findServiceInstances(appId, serviceName, versionRule);
if (instances == null || instances.isMicroserviceNotExist()) {
return null;
}
return instances.getInstancesResponse().getInstances();
}
@Override
public MicroserviceInstances findServiceInstances(String appId, String serviceName,
String versionRule) {
MicroserviceCache microserviceCache = serviceRegistryCache
.findServiceCache(MicroserviceCacheKey.builder()
.serviceName(serviceName).appId(appId)
.env(microservice.getEnvironment())
.versionRule(versionRule)
.build());
return RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache);
}
@Override
public MicroserviceCache findMicroserviceCache(MicroserviceCacheKey microserviceCacheKey) {
return serviceRegistryCache.findServiceCache(microserviceCacheKey);
}
@Override
public boolean updateMicroserviceProperties(Map<String, String> properties) {
boolean success = srClient.updateMicroserviceProperties(microservice.getServiceId(),
properties);
if (success) {
microservice.setProperties(properties);
}
return success;
}
@Override
public boolean updateInstanceProperties(Map<String, String> instanceProperties) {
MicroserviceInstance microserviceInstance = microservice.getInstance();
boolean success = srClient.updateInstanceProperties(microserviceInstance.getServiceId(),
microserviceInstance.getInstanceId(),
instanceProperties);
if (success) {
microserviceInstance.setProperties(instanceProperties);
}
return success;
}
@Override
public Microservice getRemoteMicroservice(String microserviceId) {
return srClient.getMicroservice(microserviceId);
}
@Override
public Microservice getAggregatedRemoteMicroservice(String microserviceId) {
return srClient.getAggregatedMicroservice(microserviceId);
}
@Override
public Microservice getMicroservice() {
return microservice;
}
@Override
public List<Microservice> getAllMicroservices() {
return srClient.getAllMicroservices();
}
@Override
public MicroserviceInstance getMicroserviceInstance() {
return microservice.getInstance();
}
@Override
public void destroy() {
eventBus.post(new ShutdownEvent());
unregisterInstance();
}
@Override
public String getName() {
return name;
}
void setName(String name) {
RegistryUtils.validateRegistryName(name);
this.name = name;
}
public ServiceRegistryCache getServiceRegistryCache() {
return serviceRegistryCache;
}
@Subscribe
public void onShutdown(ShutdownEvent event) {
LOGGER.info("service center task is shutdown.");
executorService.shutdownNow();
}
// post from watch eventloop, should refresh the exact microservice instances immediately
@Subscribe
public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent changedEvent) {
executorService.execute(new SuppressedRunnableWrapper(
() -> {
serviceRegistryCache.onMicroserviceInstanceChanged(changedEvent);
DiscoveryManager.INSTANCE.getAppManager().onMicroserviceInstanceChanged(changedEvent);
}));
}
}