blob: 67222981a648a3b9814be299e12651802b5fff6d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.remoting.transport;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.store.DataStore;
import com.alibaba.dubbo.common.utils.ExecutorUtil;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.Client;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* AbstractClient
*/
public abstract class AbstractClient extends AbstractEndpoint implements Client {
protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";
private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class);
private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger();
private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("DubboClientReconnectTimer", true));
private final Lock connectLock = new ReentrantLock();
private final boolean send_reconnect;
private final AtomicInteger reconnect_count = new AtomicInteger(0);
// Reconnection error log has been called before?
private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false);
// reconnect warning period. Reconnect warning interval (log warning after how many times) //for test
private final int reconnect_warning_period;
private final long shutdown_timeout;
protected volatile ExecutorService executor;
private volatile ScheduledFuture<?> reconnectExecutorFuture = null;
// the last successed connected time
private long lastConnectedTime = System.currentTimeMillis();
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);
// The default reconnection interval is 2s, 1800 means warning interval is 1 hour.
reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800);
try {
doOpen();
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
// connect.
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
return ChannelHandlers.wrap(handler, url);
}
/**
* @param url
* @return 0-false
*/
private static int getReconnectParam(URL url) {
int reconnect;
String param = url.getParameter(Constants.RECONNECT_KEY);
if (param == null || param.length() == 0 || "true".equalsIgnoreCase(param)) {
reconnect = Constants.DEFAULT_RECONNECT_PERIOD;
} else if ("false".equalsIgnoreCase(param)) {
reconnect = 0;
} else {
try {
reconnect = Integer.parseInt(param);
} catch (Exception e) {
throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:" + param);
}
if (reconnect < 0) {
throw new IllegalArgumentException("reconnect param must be nonnegative integer or false/true. input is:" + param);
}
}
return reconnect;
}
/**
* init reconnect thread
*/
private synchronized void initConnectStatusCheckCommand() {
//reconnect=false to close reconnect
int reconnect = getReconnectParam(getUrl());
if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) {
Runnable connectStatusCheckCommand = new Runnable() {
@Override
public void run() {
try {
if (!isConnected()) {
connect();
} else {
lastConnectedTime = System.currentTimeMillis();
}
} catch (Throwable t) {
String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl();
// wait registry sync provider list
if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) {
if (!reconnect_error_log_flag.get()) {
reconnect_error_log_flag.set(true);
logger.error(errorMsg, t);
return;
}
}
if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0) {
logger.warn(errorMsg, t);
}
}
}
};
reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);
}
}
private synchronized void destroyConnectStatusCheckCommand() {
try {
if (reconnectExecutorFuture != null && !reconnectExecutorFuture.isDone()) {
reconnectExecutorFuture.cancel(true);
reconnectExecutorService.purge();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
protected ExecutorService createExecutor() {
return Executors.newCachedThreadPool(new NamedThreadFactory(CLIENT_THREAD_POOL_NAME + CLIENT_THREAD_POOL_ID.incrementAndGet() + "-" + getUrl().getAddress(), true));
}
public InetSocketAddress getConnectAddress() {
return new InetSocketAddress(NetUtils.filterLocalHost(getUrl().getHost()), getUrl().getPort());
}
@Override
public InetSocketAddress getRemoteAddress() {
Channel channel = getChannel();
if (channel == null)
return getUrl().toInetSocketAddress();
return channel.getRemoteAddress();
}
@Override
public InetSocketAddress getLocalAddress() {
Channel channel = getChannel();
if (channel == null)
return InetSocketAddress.createUnresolved(NetUtils.getLocalHost(), 0);
return channel.getLocalAddress();
}
@Override
public boolean isConnected() {
Channel channel = getChannel();
if (channel == null)
return false;
return channel.isConnected();
}
@Override
public Object getAttribute(String key) {
Channel channel = getChannel();
if (channel == null)
return null;
return channel.getAttribute(key);
}
@Override
public void setAttribute(String key, Object value) {
Channel channel = getChannel();
if (channel == null)
return;
channel.setAttribute(key, value);
}
@Override
public void removeAttribute(String key) {
Channel channel = getChannel();
if (channel == null)
return;
channel.removeAttribute(key);
}
@Override
public boolean hasAttribute(String key) {
Channel channel = getChannel();
if (channel == null)
return false;
return channel.hasAttribute(key);
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (send_reconnect && !isConnected()) {
connect();
}
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
channel.send(message, sent);
}
protected void connect() throws RemotingException {
connectLock.lock();
try {
if (isConnected()) {
return;
}
initConnectStatusCheckCommand();
doConnect();
if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getTimeout() + "ms.");
} else {
if (logger.isInfoEnabled()) {
logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
reconnect_count.set(0);
reconnect_error_log_flag.set(false);
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
connectLock.unlock();
}
}
public void disconnect() {
connectLock.lock();
try {
destroyConnectStatusCheckCommand();
try {
Channel channel = getChannel();
if (channel != null) {
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
doDisConnect();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
} finally {
connectLock.unlock();
}
}
@Override
public void reconnect() throws RemotingException {
disconnect();
connect();
}
@Override
public void close() {
try {
if (executor != null) {
ExecutorUtil.shutdownNow(executor, 100);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
super.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
disconnect();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
doClose();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
@Override
public void close(int timeout) {
ExecutorUtil.gracefulShutdown(executor, timeout);
close();
}
@Override
public String toString() {
return getClass().getName() + " [" + getLocalAddress() + " -> " + getRemoteAddress() + "]";
}
/**
* Open client.
*
* @throws Throwable
*/
protected abstract void doOpen() throws Throwable;
/**
* Close client.
*
* @throws Throwable
*/
protected abstract void doClose() throws Throwable;
/**
* Connect to server.
*
* @throws Throwable
*/
protected abstract void doConnect() throws Throwable;
/**
* disConnect to server.
*
* @throws Throwable
*/
protected abstract void doDisConnect() throws Throwable;
/**
* Get the connected channel.
*
* @return channel
*/
protected abstract Channel getChannel();
}