blob: 2d235ab524515115c424ad285c97f1e13f36092b [file] [log] [blame]
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.remoting.transport.grizzly;
import java.io.IOException;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.io.Bytes;
import com.alibaba.dubbo.common.io.UnsafeByteArrayInputStream;
import com.alibaba.dubbo.common.io.UnsafeByteArrayOutputStream;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.Codec;
import com.alibaba.dubbo.remoting.exchange.Response;
/**
* GrizzlyCodecAdapter
*
* @author william.liangf
*/
public class GrizzlyCodecAdapter extends BaseFilter {
private static final String BUFFER_KEY = GrizzlyCodecAdapter.class.getName() + ".BUFFER";
private final Codec upstreamCodec;
private final Codec downstreamCodec;
private final URL url;
private final ChannelHandler handler;
private final int bufferSize;
public GrizzlyCodecAdapter(Codec codec, URL url, ChannelHandler handler){
this(codec, codec, url, handler);
}
/**
* server 端如果有消息发送需要分开codec,默认的上行code是dubbo1兼容的
*/
public GrizzlyCodecAdapter(Codec upstreamCodec, Codec downstreamCodec, URL url, ChannelHandler handler){
this.upstreamCodec = upstreamCodec;
this.downstreamCodec = downstreamCodec;
this.url = url;
this.handler = handler;
int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);
this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
}
@Override
public NextAction handleWrite(FilterChainContext context) throws IOException {
Connection<?> connection = context.getConnection();
GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
try {
UnsafeByteArrayOutputStream output = new UnsafeByteArrayOutputStream(1024); // 不需要关闭
if(!(context.getMessage() instanceof Response)){
downstreamCodec.encode(channel, output, context.getMessage());
}else{
upstreamCodec.encode(channel, output, context.getMessage());
}
GrizzlyChannel.removeChannelIfDisconnectd(connection);
byte[] bytes = output.toByteArray();
Buffer buffer = connection.getTransport().getMemoryManager().allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
buffer.allowBufferDispose(true);
context.setMessage(buffer);
} finally {
GrizzlyChannel.removeChannelIfDisconnectd(connection);
}
return context.getInvokeAction();
}
@Override
public NextAction handleRead(FilterChainContext context) throws IOException {
Object message = context.getMessage();
Connection<?> connection = context.getConnection();
Channel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
try {
if (message instanceof Buffer) { // 收到新的数据包
Buffer buffer = (Buffer) message; // 缓存
int readable = buffer.capacity(); // 本次可读取新数据的大小
if (readable == 0) {
return context.getStopAction();
}
byte[] bytes; // byte[]缓存区,将Buffer转成byte[],再转成UnsafeByteArrayInputStream
int offset; // 指向已用数据的偏移量,off之前的数据都是已用过的
int limit; // 有效长度,limit之后的长度是空白或无效数据,off到limit之间的数据是准备使用的有效数据
Object[] remainder = (Object[]) channel.getAttribute(BUFFER_KEY); // 上次序列化剩下的数据
channel.removeAttribute(BUFFER_KEY);
if (remainder == null) { // 如果没有,创建新的bytes缓存
bytes = new byte[bufferSize];
offset = 0;
limit = 0;
} else { // 如果有,使用剩下的bytes缓存
bytes = (byte[]) remainder[0];
offset = (Integer) remainder[1];
limit = (Integer) remainder[2];
}
return receive(context, channel, buffer, readable, bytes, offset, limit);
} else if (message instanceof Object[]) { // 同一Buffer多轮Filter,即:一个Buffer里有多个请求
Object[] remainder = (Object[]) message;
Buffer buffer = (Buffer) remainder[0];
int readable = (Integer) remainder[1];
byte[] bytes = (byte[]) remainder[2];
int offset = (Integer) remainder[3];
int limit = (Integer) remainder[4];
return receive(context, channel, buffer, readable, bytes, offset, limit);
} else { // 其它事件直接往下传
return context.getInvokeAction();
}
} finally {
GrizzlyChannel.removeChannelIfDisconnectd(connection);
}
}
/*
* 接收
*
* @param context 上下文
* @param channel 通道
* @param buffer 缓存
* @param readable 缓存可读
* @param bytes 输入缓存
* @param offset 指向已读数据的偏移量,off之前的数据都是已用过的
* @param limit 有效长度,limit之后的长度是空白或无效数据,off到limit之间的数据是准备使用的数据
* @return 后续动作
* @throws IOException
*/
private NextAction receive(FilterChainContext context, Channel channel, Buffer buffer, int readable, byte[] bytes, int offset, int limit) throws IOException {
for(;;) {
int read = Math.min(readable, bytes.length - limit); // 取bytes缓存空闲区,和可读取新数据,的最小值,即:此次最多读写数据的大小
buffer.get(bytes, limit, read); // 从可读取新数据中,读取数据,尽量填满bytes缓存空闲区
limit += read; // 有效数据变长
readable -= read; // 可读数据变少
UnsafeByteArrayInputStream input = new UnsafeByteArrayInputStream(bytes, offset, limit - offset); // 将bytes缓存转成InputStream,不需要关闭
Object msg = upstreamCodec.decode(channel, input); // 调用Codec接口,解码数据
if (msg == Codec.NEED_MORE_INPUT) { // 如果Codec觉得数据不够,不足以解码成一个对象
if (readable == 0) { // 如果没有更多可读数据
channel.setAttribute(BUFFER_KEY, new Object[] { bytes, offset, limit }); // 放入通道属性中,等待下一个Buffer的到来
return context.getStopAction();
} else { // 扩充或挪出空闲区,并循环,直到可读数据都加载到bytes缓存
if (offset == 0) { // 如果bytes缓存全部没有被使用,如果这时数据还不够
bytes = Bytes.copyOf(bytes, bytes.length << 1); // 将bytes缓存扩大一倍
} else { // 如果bytes缓存有一段数据已被使用
int len = limit - offset; // 计算有效数据长度
System.arraycopy(bytes, offset, bytes, 0, len); // 将数据向前移到,压缩到已使用的部分,这样limit后面就会多出一些空闲,可以放数据
offset = 0; // 移到后,bytes缓存没有数据被使用
limit = len; // 移到后,有效数据都在bytes缓存最前面
}
}
} else { // 如果解析出一个结果
int position = input.position(); // 记录InputStream用了多少
if (position == offset) { // 如果InputStream没有被读过,就返回了数据,直接报错,否则InputStream永远读不完,会死递归
throw new IOException("Decode without read data.");
}
offset = position; // 记录已读数据
context.setMessage(msg); // 将消息改为解码后的对象,以便被后面的Filter使用。
if (limit - offset > 0 || readable > 0) { // 如果有效数据没有被读完,或者Buffer区还有未读数据
return context.getInvokeAction(new Object[] { buffer, readable, bytes, offset, limit }); // 正常执行完Filter,并重新发起一轮Filter,继续读
} else { // 否则所有数据读完
return context.getInvokeAction(); // 正常执行完Filter
}
}
}
}
}