blob: 6ab7167d90c5e80a667956206fc28f59aa18a901 [file] [log] [blame]
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.remoting.exchange.support.header;
import java.net.InetSocketAddress;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.ExecutionException;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.exchange.ExchangeChannel;
import com.alibaba.dubbo.remoting.exchange.ExchangeHandler;
import com.alibaba.dubbo.remoting.exchange.Request;
import com.alibaba.dubbo.remoting.exchange.Response;
import com.alibaba.dubbo.remoting.exchange.support.DefaultFuture;
import com.alibaba.dubbo.remoting.transport.ChannelHandlerDelegate;
/**
* ExchangeReceiver
*
* @author william.liangf
* @author chao.liuc
*/
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
protected static final Logger logger = LoggerFactory.getLogger(HeaderExchangeHandler.class);
public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";
public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";
private final ExchangeHandler handler;
public HeaderExchangeHandler(ExchangeHandler handler){
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.handler = handler;
}
void handlerEvent(Channel channel, Request req) throws RemotingException{
if (req.getData() != null && req.getData().equals(Request.READONLY_EVENT)){
channel.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, Boolean.TRUE);
}
if (req.isTwoWay()){
if (req.isHeartbeat()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(req.getData() == null ? null : req.getData().toString());
channel.send(res);
}
}
}
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) msg = null;
else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
else msg = data.toString();
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
return res;
}
// find handler by message class.
Object msg = req.getData();
try {
// handle data.
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
public void connected(Channel channel) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
handler.connected(exchangeChannel);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
public void disconnected(Channel channel) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
handler.disconnected(exchangeChannel);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
public void sent(Channel channel, Object message) throws RemotingException {
Throwable exception = null;
try {
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
handler.sent(exchangeChannel, message);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
} catch (Throwable t) {
exception = t;
}
if (message instanceof Request) {
Request request = (Request) message;
DefaultFuture.sent(channel, request);
}
if (exception != null) {
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else if (exception instanceof RemotingException) {
throw (RemotingException) exception;
} else {
throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),
exception.getMessage(), exception);
}
}
}
private static boolean isClientSide(Channel channel) {
InetSocketAddress address = channel.getRemoteAddress();
URL url = channel.getUrl();
return url.getPort() == address.getPort() &&
NetUtils.filterLocalHost(url.getIp())
.equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress()));
}
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()){
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
if (exception instanceof ExecutionException) {
ExecutionException e = (ExecutionException) exception;
Object msg = e.getRequest();
if (msg instanceof Request) {
Request req = (Request) msg;
if (req.isTwoWay() && ! req.isHeartbeat()) {
Response res = new Response(req.getId(), req.getVersion());
res.setStatus(Response.SERVER_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
return;
}
}
}
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
handler.caught(exchangeChannel, exception);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
public ChannelHandler getHandler() {
if (handler instanceof ChannelHandlerDelegate) {
return ((ChannelHandlerDelegate) handler).getHandler();
} else {
return handler;
}
}
}