blob: 6f4f1a098298213011547fc7255add5d196ce862 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.loadbalance;
import java.net.URI;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.config.ConfigurationChangedEvent;
import org.apache.servicecomb.core.Endpoint;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.SCBEngine;
import org.apache.servicecomb.core.Transport;
import org.apache.servicecomb.core.filter.AbstractFilter;
import org.apache.servicecomb.core.filter.ConsumerFilter;
import org.apache.servicecomb.core.filter.EdgeFilter;
import org.apache.servicecomb.core.filter.Filter;
import org.apache.servicecomb.core.filter.FilterNode;
import org.apache.servicecomb.core.governance.RetryContext;
import org.apache.servicecomb.foundation.common.cache.VersionedCache;
import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
import org.apache.servicecomb.foundation.common.event.EventManager;
import org.apache.servicecomb.loadbalance.Configuration.RuleType;
import org.apache.servicecomb.registry.discovery.DiscoveryContext;
import org.apache.servicecomb.registry.discovery.DiscoveryTree;
import org.apache.servicecomb.swagger.invocation.Response;
import org.apache.servicecomb.swagger.invocation.exception.ExceptionFactory;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.Subscribe;
import io.github.resilience4j.core.metrics.Metrics.Outcome;
import jakarta.ws.rs.core.Response.Status;
public class LoadBalanceFilter extends AbstractFilter implements ConsumerFilter, EdgeFilter {
public static final String CONTEXT_KEY_LAST_SERVER = "x-context-last-server";
// Enough times to make sure to choose a different server in high volume.
private static final int COUNT = 17;
public static final String CONTEXT_KEY_SERVER_LIST = "x-context-server-list";
public static final String SERVICECOMB_SERVER_ENDPOINT = "scb-endpoint";
private static final Logger LOGGER = LoggerFactory.getLogger(LoadBalanceFilter.class);
private final DiscoveryTree discoveryTree;
// key为grouping filter qualified name
private final Map<String, LoadBalancer> loadBalancerMap = new ConcurrentHashMapEx<>();
private final Object lock = new Object();
private final ExtensionsManager extensionsManager;
private final SCBEngine scbEngine;
// set endpoint in invocation.localContext
// ignore logic of loadBalance
public boolean supportDefinedEndpoint;
@Autowired
public LoadBalanceFilter(ExtensionsManager extensionsManager,
DiscoveryTree discoveryTree, SCBEngine scbEngine) {
preCheck(scbEngine);
this.scbEngine = scbEngine;
this.supportDefinedEndpoint = this.scbEngine.getEnvironment()
.getProperty("servicecomb.loadbalance.userDefinedEndpoint.enabled",
boolean.class, false);
this.extensionsManager = extensionsManager;
this.discoveryTree = discoveryTree;
EventManager.register(this);
}
@Subscribe
@SuppressWarnings("unused")
public void onConfigurationChangedEvent(ConfigurationChangedEvent event) {
Set<String> changedKeys = event.getChanged();
for (String key : changedKeys) {
if (key.startsWith(Configuration.ROOT)) {
synchronized (lock) {
clearLoadBalancer();
}
LOGGER.info("clear load balance rule for configuration changed, {}", key);
break;
}
}
}
private void preCheck(SCBEngine scbEngine) {
// Old configurations check.Just print an error, because configurations may given in dynamic and fail on runtime.
String policyName = scbEngine.getEnvironment()
.getProperty("servicecomb.loadbalance.NFLoadBalancerRuleClassName");
if (!StringUtils.isEmpty(policyName)) {
LOGGER.error("[servicecomb.loadbalance.NFLoadBalancerRuleClassName] is not supported anymore." +
"use [servicecomb.loadbalance.strategy.name] instead.");
}
String filterNames = scbEngine.getEnvironment()
.getProperty("servicecomb.loadbalance.serverListFilters");
if (!StringUtils.isEmpty(filterNames)) {
LOGGER.error(
"Server list implementation changed to SPI. Configuration [servicecomb.loadbalance.serverListFilters]" +
" is not used any more. For ServiceComb defined filters, you do not need config and can "
+ "remove this configuration safely. If you define your own filter, need to change it to SPI to make it work.");
}
}
@Override
public int getOrder() {
return Filter.CONSUMER_LOAD_BALANCE_ORDER;
}
@Override
public String getName() {
return "load-balance";
}
@Override
public CompletableFuture<Response> onFilter(Invocation invocation, FilterNode nextNode) {
try {
if (handleSuppliedEndpoint(invocation)) {
invocation.addLocalContext(RetryContext.RETRY_LOAD_BALANCE, false);
return nextNode.onFilter(invocation);
}
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
invocation.addLocalContext(RetryContext.RETRY_LOAD_BALANCE, true);
LoadBalancer loadBalancer = getOrCreateLoadBalancer(invocation);
return send(invocation, nextNode, loadBalancer);
}
// user's can invoke a service by supplying target Endpoint.
// in this case, we do not using load balancer, and no stats of server calculated, no retrying.
private boolean handleSuppliedEndpoint(Invocation invocation) throws Exception {
if (invocation.getEndpoint() != null) {
return true;
}
if (supportDefinedEndpoint) {
return defineEndpointAndHandle(invocation);
}
return false;
}
private Endpoint parseEndpoint(String endpointUri) throws Exception {
URI formatUri = new URI(endpointUri);
Transport transport = scbEngine.getTransportManager().findTransport(formatUri.getScheme());
if (transport == null) {
LOGGER.error("not deployed transport {}, ignore {}.", formatUri.getScheme(), endpointUri);
throw new InvocationException(Status.BAD_REQUEST,
"the endpoint's transport is not found.");
}
return new Endpoint(transport, endpointUri);
}
private boolean defineEndpointAndHandle(Invocation invocation) throws Exception {
Object endpoint = invocation.getLocalContext(SERVICECOMB_SERVER_ENDPOINT);
if (endpoint == null) {
return false;
}
if (endpoint instanceof String) {
// compatible to old usage
endpoint = parseEndpoint((String) endpoint);
}
invocation.setEndpoint((Endpoint) endpoint);
return true;
}
private void clearLoadBalancer() {
loadBalancerMap.clear();
}
@VisibleForTesting
CompletableFuture<Response> send(Invocation invocation, FilterNode filterNode, LoadBalancer chosenLB) {
long time = System.currentTimeMillis();
ServiceCombServer server = chooseServer(invocation, chosenLB);
if (null == server) {
return CompletableFuture.failedFuture(
new InvocationException(Status.INTERNAL_SERVER_ERROR,
String.format("No available address found for %s/%s.", invocation.getAppId(),
invocation.getMicroserviceName())));
}
invocation.setEndpoint(server.getEndpoint());
return filterNode.onFilter(invocation).whenComplete((r, e) -> {
if (e != null || isFailedResponse(r)) {
server.getServerMetrics().record((System.currentTimeMillis() - time), TimeUnit.MILLISECONDS, Outcome.ERROR);
} else {
server.getServerMetrics().record((System.currentTimeMillis() - time), TimeUnit.MILLISECONDS, Outcome.SUCCESS);
}
});
}
private ServiceCombServer chooseServer(Invocation invocation, LoadBalancer chosenLB) {
RetryContext retryContext = invocation.getLocalContext(RetryContext.RETRY_CONTEXT);
if (retryContext == null) {
return chosenLB.chooseServer(invocation);
}
if (!retryContext.isRetry()) {
ServiceCombServer server = chosenLB.chooseServer(invocation);
invocation.addLocalContext(CONTEXT_KEY_LAST_SERVER, server);
return server;
}
ServiceCombServer lastServer = invocation.getLocalContext(CONTEXT_KEY_LAST_SERVER);
ServiceCombServer nextServer = lastServer;
if (!retryContext.trySameServer()) {
for (int i = 0; i < COUNT; i++) {
ServiceCombServer s = chosenLB.chooseServer(invocation);
if (s == null) {
break;
}
if (!s.equals(nextServer)) {
nextServer = s;
break;
}
}
}
LOGGER.warn("operation failed {}, retry to instance [{}], last instance [{}], trace id {}",
invocation.getMicroserviceQualifiedName(),
nextServer == null ? "" : nextServer.getEndpoint().getEndpoint(),
lastServer == null ? "" : nextServer.getEndpoint().getEndpoint(),
invocation.getTraceId());
invocation.addLocalContext(CONTEXT_KEY_LAST_SERVER, nextServer);
return nextServer;
}
protected boolean isFailedResponse(Response resp) {
if (resp.isFailed()) {
if (resp.getResult() instanceof InvocationException) {
InvocationException e = resp.getResult();
return e.getStatusCode() == ExceptionFactory.CONSUMER_INNER_STATUS_CODE
|| e.getStatusCode() == Status.SERVICE_UNAVAILABLE.getStatusCode()
|| e.getStatusCode() == Status.REQUEST_TIMEOUT.getStatusCode();
} else {
return true;
}
} else {
return false;
}
}
protected LoadBalancer getOrCreateLoadBalancer(Invocation invocation) {
DiscoveryContext context = new DiscoveryContext();
context.setInputParameters(invocation);
VersionedCache serversVersionedCache = discoveryTree.discovery(context,
invocation.getAppId(),
invocation.getMicroserviceName());
invocation.addLocalContext(CONTEXT_KEY_SERVER_LIST, serversVersionedCache.data());
RuleType ruleType = Configuration.INSTANCE.getRuleStrategyName(invocation);
String cacheKey;
if (ruleType.getType() == RuleType.TYPE_SCHEMA) {
cacheKey = invocation.getAppId() + "-" +
invocation.getMicroserviceName() + "-" + invocation.getSchemaId();
} else {
cacheKey = invocation.getAppId() + "-" +
invocation.getMicroserviceName() + "-" + invocation.getSchemaId() + "-" + invocation.getOperationName();
}
return loadBalancerMap.computeIfAbsent(cacheKey,
key -> createLoadBalancer(ruleType, key, invocation.getMicroserviceName()));
}
private LoadBalancer createLoadBalancer(RuleType ruleType, String cacheKey, String microserviceName) {
RuleExt rule = extensionsManager.createLoadBalancerRule(ruleType.getValue());
LOGGER.info("Using load balance rule {} for microservice {}.", rule.getClass().getName(), cacheKey);
return new LoadBalancer(rule, microserviceName);
}
}