blob: f3fddd1fe9eae12d1398fd63be1d3e3882385f8b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.loadbalance;
import java.net.URI;
import java.util.Map;
import java.util.Objects;
import javax.ws.rs.core.Response.Status;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.core.Endpoint;
import org.apache.servicecomb.core.Handler;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.SCBEngine;
import org.apache.servicecomb.core.Transport;
import org.apache.servicecomb.core.governance.RetryContext;
import org.apache.servicecomb.foundation.common.cache.VersionedCache;
import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
import org.apache.servicecomb.loadbalance.filter.ServerDiscoveryFilter;
import org.apache.servicecomb.registry.discovery.DiscoveryContext;
import org.apache.servicecomb.registry.discovery.DiscoveryFilter;
import org.apache.servicecomb.registry.discovery.DiscoveryTree;
import org.apache.servicecomb.swagger.invocation.AsyncResponse;
import org.apache.servicecomb.swagger.invocation.Response;
import org.apache.servicecomb.swagger.invocation.exception.ExceptionFactory;
import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.netflix.config.DynamicPropertyFactory;
/**
* Load balance handler.
*/
public class LoadbalanceHandler implements Handler {
public static final String CONTEXT_KEY_LAST_SERVER = "x-context-last-server";
// Enough times to make sure to choose a different server in high volume.
private static final int COUNT = 17;
public static final String CONTEXT_KEY_SERVER_LIST = "x-context-server-list";
public static final String SERVICECOMB_SERVER_ENDPOINT = "scb-endpoint";
// set endpoint in invocation.localContext
// ignore logic of loadBalance
public static final boolean supportDefinedEndpoint =
DynamicPropertyFactory.getInstance()
.getBooleanProperty("servicecomb.loadbalance.userDefinedEndpoint.enabled", false).get();
private static final Logger LOGGER = LoggerFactory.getLogger(LoadbalanceHandler.class);
private DiscoveryTree discoveryTree = new DiscoveryTree();
// key为grouping filter qualified name
private final Map<String, LoadBalancer> loadBalancerMap = new ConcurrentHashMapEx<>();
private final Object lock = new Object();
private String strategy = null;
public LoadbalanceHandler(DiscoveryTree discoveryTree) {
this.discoveryTree = discoveryTree;
}
public LoadbalanceHandler() {
preCheck();
discoveryTree.loadFromSPI(DiscoveryFilter.class);
discoveryTree.addFilter(new ServerDiscoveryFilter());
discoveryTree.sort();
}
private void preCheck() {
// Old configurations check.Just print an error, because configurations may given in dynamic and fail on runtime.
String policyName = DynamicPropertyFactory.getInstance()
.getStringProperty("servicecomb.loadbalance.NFLoadBalancerRuleClassName", null)
.get();
if (!StringUtils.isEmpty(policyName)) {
LOGGER.error("[servicecomb.loadbalance.NFLoadBalancerRuleClassName] is not supported anymore." +
"use [servicecomb.loadbalance.strategy.name] instead.");
}
String filterNames = Configuration.getStringProperty(null, "servicecomb.loadbalance.serverListFilters");
if (!StringUtils.isEmpty(filterNames)) {
LOGGER.error(
"Server list implementation changed to SPI. Configuration [servicecomb.loadbalance.serverListFilters]" +
" is not used any more. For ServiceComb defined filters, you do not need config and can "
+ "remove this configuration safely. If you define your own filter, need to change it to SPI to make it work.");
}
}
@Override
public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
AsyncResponse response = asyncResp;
asyncResp = async -> {
ServiceCombServerStats.checkAndReleaseTryingChance(invocation);
response.handle(async);
};
if (handleSuppliedEndpoint(invocation, asyncResp)) {
invocation.addLocalContext(RetryContext.RETRY_LOAD_BALANCE, false);
return;
}
invocation.addLocalContext(RetryContext.RETRY_LOAD_BALANCE, true);
String strategy = Configuration.INSTANCE.getRuleStrategyName(invocation.getMicroserviceName());
if (!Objects.equals(strategy, this.strategy)) {
//配置变化,需要重新生成所有的lb实例
synchronized (lock) {
clearLoadBalancer();
}
}
this.strategy = strategy;
LoadBalancer loadBalancer = getOrCreateLoadBalancer(invocation);
send(invocation, asyncResp, loadBalancer);
}
// user's can invoke a service by supplying target Endpoint.
// in this case, we do not using load balancer, and no stats of server calculated, no retrying.
private boolean handleSuppliedEndpoint(Invocation invocation,
AsyncResponse asyncResp) throws Exception {
if (invocation.getEndpoint() != null) {
invocation.next(asyncResp);
return true;
}
if (supportDefinedEndpoint) {
return defineEndpointAndHandle(invocation, asyncResp);
}
return false;
}
private Endpoint parseEndpoint(String endpointUri) throws Exception {
URI formatUri = new URI(endpointUri);
Transport transport = SCBEngine.getInstance().getTransportManager().findTransport(formatUri.getScheme());
if (transport == null) {
LOGGER.error("not deployed transport {}, ignore {}.", formatUri.getScheme(), endpointUri);
throw new InvocationException(Status.BAD_REQUEST,
"the endpoint's transport is not found.");
}
return new Endpoint(transport, endpointUri);
}
private boolean defineEndpointAndHandle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
Object endpoint = invocation.getLocalContext(SERVICECOMB_SERVER_ENDPOINT);
if (endpoint == null) {
return false;
}
if (endpoint instanceof String) {
// compatible to old usage
endpoint = parseEndpoint((String) endpoint);
}
invocation.setEndpoint((Endpoint) endpoint);
invocation.next(asyncResp);
return true;
}
private void clearLoadBalancer() {
loadBalancerMap.clear();
}
@VisibleForTesting
void send(Invocation invocation, AsyncResponse asyncResp, LoadBalancer chosenLB) throws Exception {
long time = System.currentTimeMillis();
ServiceCombServer server = chooseServer(invocation, chosenLB);
if (null == server) {
asyncResp.consumerFail(new InvocationException(Status.INTERNAL_SERVER_ERROR, "No available address found."));
return;
}
chosenLB.getLoadBalancerStats().incrementNumRequests(server);
invocation.setEndpoint(server.getEndpoint());
invocation.next(resp -> {
// this stats is for WeightedResponseTimeRule
chosenLB.getLoadBalancerStats().noteResponseTime(server, (System.currentTimeMillis() - time));
if (isFailedResponse(resp)) {
// this stats is for SessionStickinessRule
chosenLB.getLoadBalancerStats().incrementSuccessiveConnectionFailureCount(server);
ServiceCombLoadBalancerStats.INSTANCE.markFailure(server);
} else {
chosenLB.getLoadBalancerStats().incrementActiveRequestsCount(server);
ServiceCombLoadBalancerStats.INSTANCE.markSuccess(server);
}
asyncResp.handle(resp);
});
}
private ServiceCombServer chooseServer(Invocation invocation, LoadBalancer chosenLB) {
RetryContext retryContext = invocation.getLocalContext(RetryContext.RETRY_CONTEXT);
if (retryContext == null) {
return chosenLB.chooseServer(invocation);
}
if (!retryContext.isRetry()) {
ServiceCombServer server = chosenLB.chooseServer(invocation);
invocation.addLocalContext(CONTEXT_KEY_LAST_SERVER, server);
return server;
}
ServiceCombServer lastServer = invocation.getLocalContext(CONTEXT_KEY_LAST_SERVER);
ServiceCombServer nextServer = lastServer;
if (!retryContext.trySameServer()) {
for (int i = 0; i < COUNT; i++) {
ServiceCombServer s = chosenLB.chooseServer(invocation);
if (s == null) {
break;
}
if (!s.equals(nextServer)) {
nextServer = s;
break;
}
}
}
LOGGER.info("operation failed {}, retry to instance [{}], last instance [{}], trace id {}",
invocation.getMicroserviceQualifiedName(),
nextServer == null ? "" : nextServer.getHostPort(),
lastServer == null ? "" : lastServer.getHostPort(),
invocation.getTraceId());
invocation.addLocalContext(CONTEXT_KEY_LAST_SERVER, nextServer);
return nextServer;
}
protected boolean isFailedResponse(Response resp) {
if (resp.isFailed()) {
if (resp.getResult() instanceof InvocationException) {
InvocationException e = resp.getResult();
return e.getStatusCode() == ExceptionFactory.CONSUMER_INNER_STATUS_CODE
|| e.getStatusCode() == Status.SERVICE_UNAVAILABLE.getStatusCode()
|| e.getStatusCode() == Status.REQUEST_TIMEOUT.getStatusCode();
} else {
return true;
}
} else {
return false;
}
}
protected LoadBalancer getOrCreateLoadBalancer(Invocation invocation) {
DiscoveryContext context = new DiscoveryContext();
context.setInputParameters(invocation);
VersionedCache serversVersionedCache = discoveryTree.discovery(context,
invocation.getAppId(),
invocation.getMicroserviceName(),
invocation.getMicroserviceVersionRule());
invocation.addLocalContext(CONTEXT_KEY_SERVER_LIST, serversVersionedCache.data());
return loadBalancerMap
.computeIfAbsent(serversVersionedCache.name(), name -> createLoadBalancer(invocation.getMicroserviceName()));
}
private LoadBalancer createLoadBalancer(String microserviceName) {
RuleExt rule = ExtensionsManager.createLoadBalancerRule(microserviceName);
return new LoadBalancer(rule, microserviceName);
}
}