blob: 7be50fff2e9adfe088c10fdb310fd3480b0e872d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.transport;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Client;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
import org.apache.dubbo.rpc.model.FrameworkModel;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
import static org.apache.dubbo.common.constants.CommonConstants.LAZY_CONNECT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CONNECT_PROVIDER;
import static org.apache.dubbo.config.Constants.CLIENT_THREAD_POOL_NAME;
import static org.apache.dubbo.remoting.Constants.HEARTBEAT_CHECK_TICK;
import static org.apache.dubbo.remoting.Constants.LEAST_HEARTBEAT_DURATION;
import static org.apache.dubbo.remoting.Constants.LEAST_RECONNECT_DURATION;
import static org.apache.dubbo.remoting.Constants.LEAST_RECONNECT_DURATION_KEY;
import static org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout;
/**
* AbstractClient
*/
public abstract class AbstractClient extends AbstractEndpoint implements Client {
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(AbstractClient.class);
private final Lock connectLock = new ReentrantLock();
private final boolean needReconnect;
protected volatile ExecutorService executor;
protected volatile ScheduledExecutorService connectivityExecutor;
private FrameworkModel frameworkModel;
protected long reconnectDuaration;
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
// set default needReconnect true when channel is not connected
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, true);
frameworkModel = url.getOrDefaultFrameworkModel();
initExecutor(url);
reconnectDuaration = getReconnectDuration(url);
try {
doOpen();
} catch (Throwable t) {
close();
throw new RemotingException(
url.toInetSocketAddress(),
null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(),
t);
}
try {
// connect.
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
// If lazy connect client fails to establish a connection, the client instance will still be created,
// and the reconnection will be initiated by ReconnectTask, so there is no need to throw an exception
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
logger.warn(
TRANSPORT_FAILED_CONNECT_PROVIDER,
"",
"",
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server "
+ getRemoteAddress()
+ " (the connection request is initiated by lazy connect client, ignore and retry later!), cause: "
+ t.getMessage(),
t);
return;
}
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn(
TRANSPORT_FAILED_CONNECT_PROVIDER,
"",
"",
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress()
+ " (check == false, ignore and retry later!), cause: " + t.getMessage(),
t);
}
} catch (Throwable t) {
close();
throw new RemotingException(
url.toInetSocketAddress(),
null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(),
t);
}
}
private void initExecutor(URL url) {
ExecutorRepository executorRepository = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());
/**
* Consumer's executor is shared globally, provider ip doesn't need to be part of the thread name.
*
* Instance of url is InstanceAddressURL, so addParameter actually adds parameters into ServiceInstance,
* which means params are shared among different services. Since client is shared among services this is currently not a problem.
*/
url = url.addParameter(THREAD_NAME_KEY, CLIENT_THREAD_POOL_NAME)
.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
executor = executorRepository.createExecutorIfAbsent(url);
connectivityExecutor = frameworkModel
.getBeanFactory()
.getBean(FrameworkExecutorRepository.class)
.getConnectivityScheduledExecutor();
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
return ChannelHandlers.wrap(handler, url);
}
public InetSocketAddress getConnectAddress() {
return new InetSocketAddress(NetUtils.filterLocalHost(getUrl().getHost()), getUrl().getPort());
}
@Override
public InetSocketAddress getRemoteAddress() {
Channel channel = getChannel();
if (channel == null) {
return getUrl().toInetSocketAddress();
}
return channel.getRemoteAddress();
}
@Override
public InetSocketAddress getLocalAddress() {
Channel channel = getChannel();
if (channel == null) {
return InetSocketAddress.createUnresolved(NetUtils.getLocalHost(), 0);
}
return channel.getLocalAddress();
}
@Override
public boolean isConnected() {
Channel channel = getChannel();
if (channel == null) {
return false;
}
return channel.isConnected();
}
@Override
public Object getAttribute(String key) {
Channel channel = getChannel();
if (channel == null) {
return null;
}
return channel.getAttribute(key);
}
@Override
public void setAttribute(String key, Object value) {
Channel channel = getChannel();
if (channel == null) {
return;
}
channel.setAttribute(key, value);
}
@Override
public void removeAttribute(String key) {
Channel channel = getChannel();
if (channel == null) {
return;
}
channel.removeAttribute(key);
}
@Override
public boolean hasAttribute(String key) {
Channel channel = getChannel();
if (channel == null) {
return false;
}
return channel.hasAttribute(key);
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (needReconnect && !isConnected()) {
connect();
}
Channel channel = getChannel();
// TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
channel.send(message, sent);
}
protected void connect() throws RemotingException {
connectLock.lock();
try {
if (isConnected()) {
return;
}
if (isClosed() || isClosing()) {
logger.warn(
TRANSPORT_FAILED_CONNECT_PROVIDER,
"",
"",
"No need to connect to server " + getRemoteAddress() + " from "
+ getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version "
+ Version.getVersion() + ", cause: client status is closed or closing.");
return;
}
doConnect();
if (!isConnected()) {
throw new RemotingException(
this,
"Failed to connect to server " + getRemoteAddress() + " from "
+ getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");
} else {
if (logger.isInfoEnabled()) {
logger.info("Successfully connect to server " + getRemoteAddress() + " from "
+ getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(
this,
"Failed to connect to server " + getRemoteAddress() + " from "
+ getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(),
e);
} finally {
connectLock.unlock();
}
}
public void disconnect() {
connectLock.lock();
try {
try {
Channel channel = getChannel();
if (channel != null) {
channel.close();
}
} catch (Throwable e) {
logger.warn(TRANSPORT_FAILED_CLOSE, "", "", e.getMessage(), e);
}
try {
doDisConnect();
} catch (Throwable e) {
logger.warn(TRANSPORT_FAILED_CLOSE, "", "", e.getMessage(), e);
}
} finally {
connectLock.unlock();
}
}
private long getReconnectDuration(URL url) {
int idleTimeout = getIdleTimeout(url);
long heartbeatTimeoutTick = calculateLeastDuration(idleTimeout);
return calculateReconnectDuration(url, heartbeatTimeoutTick);
}
private long calculateLeastDuration(int time) {
if (time / HEARTBEAT_CHECK_TICK <= 0) {
return LEAST_HEARTBEAT_DURATION;
} else {
return time / HEARTBEAT_CHECK_TICK;
}
}
private long calculateReconnectDuration(URL url, long tick) {
long leastReconnectDuration = url.getParameter(LEAST_RECONNECT_DURATION_KEY, LEAST_RECONNECT_DURATION);
return Math.max(leastReconnectDuration, tick);
}
@Override
public void reconnect() throws RemotingException {
connectLock.lock();
try {
disconnect();
connect();
} finally {
connectLock.unlock();
}
}
@Override
public void close() {
if (isClosed()) {
logger.warn(
TRANSPORT_FAILED_CONNECT_PROVIDER,
"",
"",
"No need to close connection to server " + getRemoteAddress() + " from "
+ getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version "
+ Version.getVersion() + ", cause: the client status is closed.");
return;
}
connectLock.lock();
try {
if (isClosed()) {
logger.warn(
TRANSPORT_FAILED_CONNECT_PROVIDER,
"",
"",
"No need to close connection to server " + getRemoteAddress() + " from "
+ getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version "
+ Version.getVersion() + ", cause: the client status is closed.");
return;
}
try {
super.close();
} catch (Throwable e) {
logger.warn(TRANSPORT_FAILED_CLOSE, "", "", e.getMessage(), e);
}
try {
disconnect();
} catch (Throwable e) {
logger.warn(TRANSPORT_FAILED_CLOSE, "", "", e.getMessage(), e);
}
try {
doClose();
} catch (Throwable e) {
logger.warn(TRANSPORT_FAILED_CLOSE, "", "", e.getMessage(), e);
}
} finally {
connectLock.unlock();
}
}
@Override
public void close(int timeout) {
close();
}
@Override
public String toString() {
return getClass().getName() + " [" + getLocalAddress() + " -> " + getRemoteAddress() + "]";
}
/**
* Open client.
*
* @throws Throwable
*/
protected abstract void doOpen() throws Throwable;
/**
* Close client.
*
* @throws Throwable
*/
protected abstract void doClose() throws Throwable;
/**
* Connect to server.
*
* @throws Throwable
*/
protected abstract void doConnect() throws Throwable;
/**
* disConnect to server.
*
* @throws Throwable
*/
protected abstract void doDisConnect() throws Throwable;
/**
* Get the connected channel.
*
* @return channel
*/
protected abstract Channel getChannel();
}