blob: 091376cf5f4be65dce73b3372e143a7279ae79a5 [file] [log] [blame]
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.remoting.exchange.support.header;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.Version;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.Server;
import com.alibaba.dubbo.remoting.exchange.ExchangeChannel;
import com.alibaba.dubbo.remoting.exchange.ExchangeServer;
import com.alibaba.dubbo.remoting.exchange.Request;
import com.alibaba.dubbo.remoting.exchange.support.DefaultFuture;
/**
* ExchangeServerImpl
*
* @author william.liangf
*/
public class HeaderExchangeServer implements ExchangeServer {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
new NamedThreadFactory(
"dubbo-remoting-server-heartbeat",
true));
// 心跳定时器
private ScheduledFuture<?> heatbeatTimer;
// 心跳超时,毫秒。缺省0,不会执行心跳。
private int heartbeat;
private int heartbeatTimeout;
private final Server server;
private volatile boolean closed = false;
public HeaderExchangeServer(Server server) {
if (server == null) {
throw new IllegalArgumentException("server == null");
}
this.server = server;
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, Constants.DEFAULT_HEARTBEAT);
this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
startHeatbeatTimer();
}
public Server getServer() {
return server;
}
public boolean isClosed() {
return server.isClosed();
}
private boolean isRunning() {
Collection<Channel> channels = getChannels();
for (Channel channel : channels) {
if (DefaultFuture.hasFuture(channel)) {
return true;
}
}
return false;
}
public void close() {
doClose();
server.close();
}
public void close(final int timeout) {
if (timeout > 0) {
final long max = (long) timeout;
final long start = System.currentTimeMillis();
if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, false)){
sendChannelReadOnlyEvent();
}
while (HeaderExchangeServer.this.isRunning()
&& System.currentTimeMillis() - start < max) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn(e.getMessage(), e);
}
}
}
doClose();
server.close(timeout);
}
private void sendChannelReadOnlyEvent(){
Request request = new Request();
request.setEvent(Request.READONLY_EVENT);
request.setTwoWay(false);
request.setVersion(Version.getVersion());
Collection<Channel> channels = getChannels();
for (Channel channel : channels) {
try {
if (channel.isConnected())channel.send(request, getUrl().getParameter(Constants.CHANNEL_READONLYEVENT_SENT_KEY, true));
} catch (RemotingException e) {
logger.warn("send connot write messge error.", e);
}
}
}
private void doClose() {
if (closed) {
return;
}
closed = true;
stopHeartbeatTimer();
try {
scheduled.shutdown();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
public Collection<ExchangeChannel> getExchangeChannels() {
Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>();
Collection<Channel> channels = server.getChannels();
if (channels != null && channels.size() > 0) {
for (Channel channel : channels) {
exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel));
}
}
return exchangeChannels;
}
public ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress) {
Channel channel = server.getChannel(remoteAddress);
return HeaderExchangeChannel.getOrAddChannel(channel);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public Collection<Channel> getChannels() {
return (Collection)getExchangeChannels();
}
public Channel getChannel(InetSocketAddress remoteAddress) {
return getExchangeChannel(remoteAddress);
}
public boolean isBound() {
return server.isBound();
}
public InetSocketAddress getLocalAddress() {
return server.getLocalAddress();
}
public URL getUrl() {
return server.getUrl();
}
public ChannelHandler getChannelHandler() {
return server.getChannelHandler();
}
public void reset(URL url) {
server.reset(url);
try {
if (url.hasParameter(Constants.HEARTBEAT_KEY)
|| url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) {
int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);
if (t < h * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
if (h != heartbeat || t != heartbeatTimeout) {
heartbeat = h;
heartbeatTimeout = t;
startHeatbeatTimer();
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
}
@Deprecated
public void reset(com.alibaba.dubbo.common.Parameters parameters){
reset(getUrl().addParameters(parameters.getParameters()));
}
public void send(Object message) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
}
server.send(message);
}
public void send(Object message, boolean sent) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
}
server.send(message, sent);
}
private void startHeatbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heatbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
public Collection<Channel> getChannels() {
return Collections.unmodifiableCollection(
HeaderExchangeServer.this.getChannels() );
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat,TimeUnit.MILLISECONDS);
}
}
private void stopHeartbeatTimer() {
try {
ScheduledFuture<?> timer = heatbeatTimer;
if (timer != null && ! timer.isCancelled()) {
timer.cancel(true);
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
} finally {
heatbeatTimer =null;
}
}
}