blob: 6d0a941594d8f051a6b775515493ec3765a3dabf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.registry.consumer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.servicecomb.foundation.common.VendorExtensions;
import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
import org.apache.servicecomb.registry.DiscoveryManager;
import org.apache.servicecomb.registry.api.event.CreateMicroserviceEvent;
import org.apache.servicecomb.registry.api.event.DestroyMicroserviceEvent;
import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstance;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstanceStatus;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstances;
import org.apache.servicecomb.registry.config.ServiceRegistryCommonConfig;
import org.apache.servicecomb.registry.definition.DefinitionConst;
import org.apache.servicecomb.registry.definition.MicroserviceNameParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MicroserviceVersions {
private static final Logger LOGGER = LoggerFactory.getLogger(MicroserviceVersions.class);
protected AppManager appManager;
protected String appId;
protected String shortName;
protected String microserviceName;
// revision and pulledInstances directly equals to SC's response
protected String revision = null;
private List<MicroserviceInstance> pulledInstances;
private MicroserviceInstances lastPulledResult;
// instances not always equals to pulledInstances
// in the future:
// pulledInstances means all instance
// instances means available instance
Collection<MicroserviceInstance> instances;
// key is service id
Map<String, MicroserviceVersion> versions = new ConcurrentHashMapEx<>();
// key is version rule
Map<String, MicroserviceVersionRule> versionRules = new ConcurrentHashMapEx<>();
// process pulled instances and create versionRule must be protected by lock
// otherwise maybe lost instance or version in versionRule
private final Object lock = new Object();
private long lastPullTime = 0;
private boolean waitingDelete = false;
private final VendorExtensions vendorExtensions = new VendorExtensions();
public MicroserviceVersions(AppManager appManager, String appId, String microserviceName) {
this.appManager = appManager;
this.appId = appId;
this.microserviceName = microserviceName;
this.shortName = new MicroserviceNameParser(appId, microserviceName).getShortName();
appManager.getEventBus().post(new CreateMicroserviceEvent(this));
LOGGER.info("create MicroserviceVersions, appId={}, microserviceName={}.",
appId,
microserviceName);
}
public boolean isWaitingDelete() {
return waitingDelete;
}
public MicroserviceVersions markWaitingDelete() {
this.waitingDelete = true;
return this;
}
public AppManager getAppManager() {
return appManager;
}
public String getAppId() {
return appId;
}
public String getMicroserviceName() {
return microserviceName;
}
public String getShortName() {
return shortName;
}
public Map<String, MicroserviceVersion> getVersions() {
return versions;
}
@SuppressWarnings("unchecked")
public <T extends MicroserviceVersion> T getVersion(String serviceId) {
return (T) versions.get(serviceId);
}
public String getRevision() {
return revision;
}
public void setRevision(String revision) {
this.revision = revision;
}
public List<MicroserviceInstance> getPulledInstances() {
return pulledInstances;
}
public long getLastPullTime() {
return lastPullTime;
}
public MicroserviceInstances getLastPulledResult() {
return lastPulledResult;
}
public VendorExtensions getVendorExtensions() {
return vendorExtensions;
}
public void pullInstances() {
lastPullTime = System.currentTimeMillis();
MicroserviceInstances microserviceInstances = findServiceInstances();
lastPulledResult = microserviceInstances;
if (microserviceInstances == null) {
// pulled failed, did not get anything
// will not do anything, consumers will use existing instances
return;
}
if (microserviceInstances.isMicroserviceNotExist()) {
// pulled failed, SC said target not exist
waitingDelete = true;
return;
}
if (null != revision && revision.equals(microserviceInstances.getRevision())) {
return;
}
pulledInstances = microserviceInstances.getInstancesResponse().getInstances();
pulledInstances.sort(Comparator.comparing(MicroserviceInstance::getInstanceId));
String rev = microserviceInstances.getRevision();
safeSetInstances(pulledInstances, rev);
}
protected MicroserviceInstances findServiceInstances() {
return DiscoveryManager.INSTANCE.findServiceInstances(appId,
microserviceName,
DefinitionConst.VERSION_RULE_ALL,
revision);
}
protected void safeSetInstances(List<MicroserviceInstance> pulledInstances, String rev) {
try {
List<MicroserviceInstance> filteredInstance = pulledInstances;
// 增加一个配置项只使用 `UP` 实例。 在使用 `TESTING` 进行拨测, 并且配置了
// servicecomb.references.version-rule=latest 场景,需要保证不使用
// `TESTING` 实例。 不能依赖 InstanceStatusDiscoveryFilter, 避免
// 构建的 VersionRule 实例列表为空。
if (ServiceRegistryCommonConfig.useUpInstancesOnly()) {
filteredInstance = pulledInstances.stream().filter(item -> MicroserviceInstanceStatus.UP == item.getStatus())
.collect(Collectors.toList());
}
setInstances(filteredInstance, rev);
} catch (Throwable e) {
waitingDelete = true;
LOGGER.error("Failed to setInstances, appId={}, microserviceName={}.",
getAppId(),
getMicroserviceName(),
e);
}
}
static class MergedInstances {
// key is microserviceId
Map<String, List<MicroserviceInstance>> microserviceIdMap = new HashMap<>();
// key is instanceId
Map<String, MicroserviceInstance> instanceIdMap = new HashMap<>();
void addInstance(MicroserviceInstance instance) {
instanceIdMap.put(instance.getInstanceId(), instance);
microserviceIdMap
.computeIfAbsent(instance.getServiceId(), key -> new ArrayList<>())
.add(instance);
}
}
private void setInstances(List<MicroserviceInstance> pulledInstances, String rev) {
synchronized (lock) {
MergedInstances mergedInstances = mergeInstances(pulledInstances, instances);
instances = mergedInstances.instanceIdMap.values();
// clear cache
versions.forEach((key, value) -> value.setInstances(new ArrayList<>()));
for (Entry<String, List<MicroserviceInstance>> entry : mergedInstances.microserviceIdMap.entrySet()) {
// ensure microserviceVersion exists
versions.computeIfAbsent(entry.getKey(),
microserviceId -> createMicroserviceVersion(microserviceId, entry.getValue()))
.setInstances(entry.getValue());
}
for (MicroserviceVersionRule microserviceVersionRule : versionRules.values()) {
microserviceVersionRule.update(versions, instances);
}
revision = rev;
}
}
protected MicroserviceVersion createMicroserviceVersion(String microserviceId, List<MicroserviceInstance> instances) {
return new MicroserviceVersion(this, microserviceId, microserviceName, instances);
}
private MergedInstances mergeInstances(List<MicroserviceInstance> pulledInstances,
Collection<MicroserviceInstance> inUseInstances) {
MergedInstances mergedInstances = new MergedInstances();
pulledInstances.stream().forEach(mergedInstances::addInstance);
MicroserviceInstancePing ping = SPIServiceUtils.getPriorityHighestService(MicroserviceInstancePing.class);
if (pulledInstances.isEmpty() && inUseInstances != null && ServiceRegistryCommonConfig
.isEmptyInstanceProtectionEnabled()) {
inUseInstances.stream().forEach(instance -> {
if (!mergedInstances.instanceIdMap.containsKey(instance.getInstanceId())) {
if (ping.ping(instance)) {
mergedInstances.addInstance(instance);
}
}
});
}
return mergedInstances;
}
public MicroserviceVersionRule getOrCreateMicroserviceVersionRule(String versionRule) {
// do not use computeIfAbsent
MicroserviceVersionRule microserviceVersionRule = versionRules.get(versionRule);
if (microserviceVersionRule == null) {
synchronized (lock) {
microserviceVersionRule = versionRules.computeIfAbsent(versionRule, this::createAndInitMicroserviceVersionRule);
}
}
return microserviceVersionRule;
}
protected MicroserviceVersionRule createAndInitMicroserviceVersionRule(String strVersionRule) {
LOGGER.info("create MicroserviceVersionRule, appId={}, microserviceName={}, versionRule={}.",
appId,
microserviceName,
strVersionRule);
MicroserviceVersionRule microserviceVersionRule =
new MicroserviceVersionRule(appId, microserviceName, strVersionRule);
microserviceVersionRule.update(versions, instances);
return microserviceVersionRule;
}
public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent changedEvent) {
if (!isEventAccept(changedEvent)) {
return;
}
// pull instances always replace old instances, not append
//
// pull result and watch event sequence is not defined even inside SC.
// it's not safe to trust the event, so we just send a new pull request
//
// CREATE/UPDATE:
// if pull 1/2/3, and then add 4, but "add 4" received before pull result, will lost 4.
// DELETE:
// if pull 1/2/3, and then delete 3, but "delete 3" received before pull result, will have wrong 3.
// EXPIRE::
// black/white config in SC changed, we must refresh all data from sc.
pullInstances();
}
protected boolean isEventAccept(MicroserviceInstanceChangedEvent changedEvent) {
return (appId.equals(changedEvent.getKey().getAppId()) &&
microserviceName.equals(changedEvent.getKey().getServiceName())) ||
microserviceName.equals(
changedEvent.getKey().getAppId() + DefinitionConst.APP_SERVICE_SEPARATOR + changedEvent.getKey()
.getServiceName());
}
public void destroy() {
for (MicroserviceVersion microserviceVersion : versions.values()) {
microserviceVersion.destroy();
}
appManager.getEventBus().post(new DestroyMicroserviceEvent(this));
}
}