blob: c4e238c38da02f5477d920a0658ba2e6180c59ed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.protocol.dubbo;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.url.component.ServiceConfigURL;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.RemotingServer;
import org.apache.dubbo.remoting.Transporter;
import org.apache.dubbo.remoting.exchange.ExchangeChannel;
import org.apache.dubbo.remoting.exchange.ExchangeClient;
import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import org.apache.dubbo.remoting.exchange.ExchangeServer;
import org.apache.dubbo.remoting.exchange.Exchangers;
import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.ProtocolServer;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ScopeModel;
import org.apache.dubbo.rpc.protocol.AbstractProtocol;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.LAZY_CONNECT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.STUB_EVENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.remoting.Constants.CHANNEL_READONLYEVENT_SENT_KEY;
import static org.apache.dubbo.remoting.Constants.CLIENT_KEY;
import static org.apache.dubbo.remoting.Constants.CODEC_KEY;
import static org.apache.dubbo.remoting.Constants.CONNECTIONS_KEY;
import static org.apache.dubbo.remoting.Constants.DEFAULT_HEARTBEAT;
import static org.apache.dubbo.remoting.Constants.DEFAULT_REMOTING_CLIENT;
import static org.apache.dubbo.remoting.Constants.HEARTBEAT_KEY;
import static org.apache.dubbo.remoting.Constants.SERVER_KEY;
import static org.apache.dubbo.rpc.Constants.DEFAULT_REMOTING_SERVER;
import static org.apache.dubbo.rpc.Constants.DEFAULT_STUB_EVENT;
import static org.apache.dubbo.rpc.Constants.IS_SERVER_KEY;
import static org.apache.dubbo.rpc.Constants.STUB_EVENT_METHODS_KEY;
import static org.apache.dubbo.rpc.protocol.dubbo.Constants.CALLBACK_SERVICE_KEY;
import static org.apache.dubbo.rpc.protocol.dubbo.Constants.DEFAULT_SHARE_CONNECTIONS;
import static org.apache.dubbo.rpc.protocol.dubbo.Constants.IS_CALLBACK_SERVICE;
import static org.apache.dubbo.rpc.protocol.dubbo.Constants.ON_CONNECT_KEY;
import static org.apache.dubbo.rpc.protocol.dubbo.Constants.ON_DISCONNECT_KEY;
import static org.apache.dubbo.rpc.protocol.dubbo.Constants.SHARE_CONNECTIONS_KEY;
/**
* dubbo protocol support.
*/
public class DubboProtocol extends AbstractProtocol {
public static final String NAME = "dubbo";
public static final int DEFAULT_PORT = 20880;
private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke";
/**
* <host:port,Exchanger>
* Map<String, List<ReferenceCountExchangeClient>
*/
private final Map<String, Object> referenceClientMap = new ConcurrentHashMap<>();
private static final Object PENDING_OBJECT = new Object();
private final AtomicBoolean destroyed = new AtomicBoolean();
private final ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
inv.setServiceModel(invoker.getUrl().getServiceModel());
// switch TCCL
if (invoker.getUrl().getServiceModel() != null) {
Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());
}
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
if (logger.isDebugEnabled()) {
logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
}
invoke(channel, ON_DISCONNECT_KEY);
}
private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
if (Boolean.TRUE.toString().equals(invocation.getAttachment(STUB_EVENT_KEY))) {
tryToGetStubService(channel, invocation);
}
received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}
private void tryToGetStubService(Channel channel, Invocation invocation) throws RemotingException {
try {
Invoker<?> invoker = getInvoker(channel, invocation);
} catch (RemotingException e) {
String serviceKey = serviceKey(
0,
(String) invocation.getObjectAttachmentWithoutConvert(PATH_KEY),
(String) invocation.getObjectAttachmentWithoutConvert(VERSION_KEY),
(String) invocation.getObjectAttachmentWithoutConvert(GROUP_KEY)
);
throw new RemotingException(channel, "The stub service[" + serviceKey + "] is not found, it may not be exported yet");
}
}
/**
* FIXME channel.getUrl() always binds to a fixed service, and this service is random.
* we can choose to use a common service to carry onConnect event if there's no easy way to get the specific
* service this connection is binding to.
* @param channel
* @param url
* @param methodKey
* @return
*/
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(url.getServiceModel(), method, url.getParameter(INTERFACE_KEY), "", new Class<?>[0], new Object[0]);
invocation.setAttachment(PATH_KEY, url.getPath());
invocation.setAttachment(GROUP_KEY, url.getGroup());
invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
invocation.setAttachment(VERSION_KEY, url.getVersion());
if (url.getParameter(STUB_EVENT_KEY, false)) {
invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
public DubboProtocol() {
}
/**
* @deprecated Use {@link DubboProtocol#getDubboProtocol(ScopeModel)} instead
*/
@Deprecated
public static DubboProtocol getDubboProtocol() {
return (DubboProtocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(DubboProtocol.NAME, false);
}
public static DubboProtocol getDubboProtocol(ScopeModel scopeModel) {
return (DubboProtocol) scopeModel.getExtensionLoader(Protocol.class).getExtension(DubboProtocol.NAME, false);
}
private boolean isClientSide(Channel channel) {
InetSocketAddress address = channel.getRemoteAddress();
URL url = channel.getUrl();
return url.getPort() == address.getPort() &&
NetUtils.filterLocalHost(channel.getUrl().getIp())
.equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress()));
}
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke;
boolean isStubServiceInvoke;
int port = channel.getLocalAddress().getPort();
String path = (String) inv.getObjectAttachmentWithoutConvert(PATH_KEY);
//if it's stub service on client side(after enable stubevent, usually is set up onconnect or ondisconnect method)
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(STUB_EVENT_KEY));
if (isStubServiceInvoke) {
//when a stub service export to local, it usually can't be exposed to port
port = 0;
}
// if it's callback service on client side
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path += "." + inv.getObjectAttachmentWithoutConvert(CALLBACK_SERVICE_KEY);
inv.setObjectAttachment(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
String serviceKey = serviceKey(
port,
path,
(String) inv.getObjectAttachmentWithoutConvert(VERSION_KEY),
(String) inv.getObjectAttachmentWithoutConvert(GROUP_KEY)
);
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv));
}
return exporter.getInvoker();
}
public Collection<Invoker<?>> getInvokers() {
return Collections.unmodifiableCollection(invokers);
}
@Override
public int getDefaultPort() {
return DEFAULT_PORT;
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
checkDestroyed();
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
//export a stub service for dispatching event
boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
boolean isCallbackService = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackService) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stub proxy support event ,but no stub methods founded."));
}
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
checkDestroyed();
// find server.
String key = url.getAddress();
// client can export a service which only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
return;
}
}
}
// server supports reset, use together with override
server.reset(url);
}
}
private void checkDestroyed() {
if (destroyed.get()) {
throw new IllegalStateException(getClass().getSimpleName() + " is destroyed");
}
}
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String transporter = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (StringUtils.isNotEmpty(transporter) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter)) {
throw new RpcException("Unsupported server type: " + transporter + ", url: " + url);
}
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
transporter = url.getParameter(CLIENT_KEY);
if (StringUtils.isNotEmpty(transporter) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(transporter)) {
throw new RpcException("Unsupported client type: " + transporter);
}
DubboProtocolServer protocolServer = new DubboProtocolServer(server);
loadServerProperties(protocolServer);
return protocolServer;
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
checkDestroyed();
return protocolBindingRefer(type, url);
}
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
checkDestroyed();
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
private ExchangeClient[] getClients(URL url) {
int connections = url.getParameter(CONNECTIONS_KEY, 0);
// whether to share connection
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
/*
* The xml configuration should have a higher priority than properties.
*/
String shareConnectionsStr = StringUtils.isBlank(url.getParameter(SHARE_CONNECTIONS_KEY, (String) null))
? ConfigurationUtils.getProperty(url.getOrDefaultApplicationModel(), SHARE_CONNECTIONS_KEY, DEFAULT_SHARE_CONNECTIONS)
: url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(shareConnectionsStr);
List<ReferenceCountExchangeClient> shareClients = getSharedClient(url, connections);
ExchangeClient[] clients = new ExchangeClient[connections];
Arrays.setAll(clients, shareClients::get);
return clients;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
clients[i] = initClient(url);
}
return clients;
}
/**
* Get shared connection
*
* @param url
* @param connectNum connectNum must be greater than or equal to 1
*/
@SuppressWarnings("unchecked")
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
String key = url.getAddress();
Object clients = referenceClientMap.get(key);
if (clients instanceof List) {
List<ReferenceCountExchangeClient> typedClients = (List<ReferenceCountExchangeClient>) clients;
if (checkClientCanUse(typedClients)) {
batchClientRefIncr(typedClients);
return typedClients;
}
}
List<ReferenceCountExchangeClient> typedClients = null;
synchronized (referenceClientMap) {
for (; ; ) {
// guarantee just one thread in loading condition. And Other is waiting It had finished.
clients = referenceClientMap.get(key);
if (clients instanceof List) {
typedClients = (List<ReferenceCountExchangeClient>) clients;
if (checkClientCanUse(typedClients)) {
batchClientRefIncr(typedClients);
return typedClients;
} else {
referenceClientMap.put(key, PENDING_OBJECT);
break;
}
} else if (clients == PENDING_OBJECT) {
try {
referenceClientMap.wait();
} catch (InterruptedException ignored) {
}
} else {
referenceClientMap.put(key, PENDING_OBJECT);
break;
}
}
}
try {
// connectNum must be greater than or equal to 1
connectNum = Math.max(connectNum, 1);
// If the clients is empty, then the first initialization is
if (CollectionUtils.isEmpty(typedClients)) {
typedClients = buildReferenceCountExchangeClientList(url, connectNum);
} else {
for (int i = 0; i < typedClients.size(); i++) {
ReferenceCountExchangeClient referenceCountExchangeClient = typedClients.get(i);
// If there is a client in the list that is no longer available, create a new one to replace him.
if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
typedClients.set(i, buildReferenceCountExchangeClient(url));
continue;
}
referenceCountExchangeClient.incrementAndGetCount();
}
}
} finally {
synchronized (referenceClientMap) {
if (typedClients == null) {
referenceClientMap.remove(key);
} else {
referenceClientMap.put(key, typedClients);
}
referenceClientMap.notifyAll();
}
}
return typedClients;
}
/**
* Check if the client list is all available
*
* @param referenceCountExchangeClients
* @return true-available,false-unavailable
*/
private boolean checkClientCanUse(List<ReferenceCountExchangeClient> referenceCountExchangeClients) {
if (CollectionUtils.isEmpty(referenceCountExchangeClients)) {
return false;
}
// As long as one client is not available, you need to replace the unavailable client with the available one.
return referenceCountExchangeClients.stream()
.noneMatch(referenceCountExchangeClient -> referenceCountExchangeClient == null
|| referenceCountExchangeClient.getCount() <= 0 || referenceCountExchangeClient.isClosed());
}
/**
* Increase the reference Count if we create new invoker shares same connection, the connection will be closed without any reference.
*
* @param referenceCountExchangeClients
*/
private void batchClientRefIncr(List<ReferenceCountExchangeClient> referenceCountExchangeClients) {
if (CollectionUtils.isEmpty(referenceCountExchangeClients)) {
return;
}
referenceCountExchangeClients.stream()
.filter(Objects::nonNull)
.forEach(ReferenceCountExchangeClient::incrementAndGetCount);
}
/**
* Bulk build client
*
* @param url
* @param connectNum
* @return
*/
private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
List<ReferenceCountExchangeClient> clients = new ArrayList<>();
for (int i = 0; i < connectNum; i++) {
clients.add(buildReferenceCountExchangeClient(url));
}
return clients;
}
/**
* Build a single client
*
* @param url
* @return
*/
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
ExchangeClient exchangeClient = initClient(url);
ReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, DubboCodec.NAME);
// read configs
int shutdownTimeout = ConfigurationUtils.getServerShutdownTimeout(url.getScopeModel());
client.setShutdownWaitTime(shutdownTimeout);
return client;
}
/**
* Create new connection
*
* @param url
*/
private ExchangeClient initClient(URL url) {
/*
* Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance,
* which means params are shared among different services. Since client is shared among services this is currently not a problem.
*/
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
// BIO is not allowed since it has severe performance issue.
if (StringUtils.isNotEmpty(str) && !url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(url.getOrDefaultFrameworkModel().getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
try {
// Replace InstanceAddressURL with ServiceConfigURL.
url = new ServiceConfigURL(DubboCodec.NAME, url.getUsername(), url.getPassword(), url.getHost(), url.getPort(), url.getPath(), url.getAllParameters());
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// connection should be lazy
return url.getParameter(LAZY_CONNECT_KEY, false)
? new LazyConnectExchangeClient(url, requestHandler)
: Exchangers.connect(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
}
@Override
@SuppressWarnings("unchecked")
public void destroy() {
if (!destroyed.compareAndSet(false, true)) {
return;
}
if (logger.isInfoEnabled()) {
logger.info("Destroying protocol [" + this.getClass().getSimpleName() + "] ...");
}
for (String key : new ArrayList<>(serverMap.keySet())) {
ProtocolServer protocolServer = serverMap.remove(key);
if (protocolServer == null) {
continue;
}
RemotingServer server = protocolServer.getRemotingServer();
try {
if (logger.isInfoEnabled()) {
logger.info("Closing dubbo server: " + server.getLocalAddress());
}
server.close(getServerShutdownTimeout(protocolServer));
} catch (Throwable t) {
logger.warn("Close dubbo server [" + server.getLocalAddress() + "] failed: " + t.getMessage(), t);
}
}
serverMap.clear();
for (String key : new ArrayList<>(referenceClientMap.keySet())) {
Object clients = referenceClientMap.remove(key);
if (clients instanceof List) {
List<ReferenceCountExchangeClient> typedClients = (List<ReferenceCountExchangeClient>) clients;
if (CollectionUtils.isEmpty(typedClients)) {
continue;
}
for (ReferenceCountExchangeClient client : typedClients) {
closeReferenceCountExchangeClient(client);
}
}
}
PortUnificationExchanger.close();
referenceClientMap.clear();
super.destroy();
}
/**
* close ReferenceCountExchangeClient
*
* @param client
*/
private void closeReferenceCountExchangeClient(ReferenceCountExchangeClient client) {
if (client == null) {
return;
}
try {
if (logger.isInfoEnabled()) {
logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
}
client.close(client.getShutdownWaitTime());
// TODO
/*
* At this time, ReferenceCountExchangeClient#client has been replaced with LazyConnectExchangeClient.
* Do you need to call client.close again to ensure that LazyConnectExchangeClient is also closed?
*/
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
/**
* only log body in debugger mode for size & security consideration.
*
* @param invocation
* @return
*/
private Invocation getInvocationWithoutData(Invocation invocation) {
if (logger.isDebugEnabled()) {
return invocation;
}
if (invocation instanceof RpcInvocation) {
RpcInvocation rpcInvocation = (RpcInvocation) invocation;
rpcInvocation.setArguments(null);
return rpcInvocation;
}
return invocation;
}
}