blob: 2441ae9fd3a7e5af21b1ebc10b7f8bca9f132589 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.udp;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Endpoint;
import org.apache.activemq.command.LastPartialCommand;
import org.apache.activemq.command.PartialCommand;
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.reliable.ReplayBuffer;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A strategy for reading datagrams and de-fragmenting them together.
*
*
*/
public class CommandDatagramChannel extends CommandChannelSupport {
private static final Logger LOG = LoggerFactory.getLogger(CommandDatagramChannel.class);
private DatagramChannel channel;
private ByteBufferPool bufferPool;
// reading
private Object readLock = new Object();
private ByteBuffer readBuffer;
// writing
private Object writeLock = new Object();
private int defaultMarshalBufferSize = 64 * 1024;
private volatile int receiveCounter;
public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller,
DatagramChannel channel, ByteBufferPool bufferPool) {
super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
this.channel = channel;
this.bufferPool = bufferPool;
}
public void start() throws Exception {
bufferPool.setDefaultSize(datagramSize);
bufferPool.start();
readBuffer = bufferPool.borrowBuffer();
}
public void stop() throws Exception {
bufferPool.stop();
}
public Command read() throws IOException {
Command answer = null;
Endpoint from = null;
synchronized (readLock) {
while (true) {
readBuffer.clear();
SocketAddress address = channel.receive(readBuffer);
readBuffer.flip();
if (readBuffer.limit() == 0) {
continue;
}
receiveCounter++;
from = headerMarshaller.createEndpoint(readBuffer, address);
int remaining = readBuffer.remaining();
byte[] data = new byte[remaining];
readBuffer.get(data);
// TODO could use a DataInput implementation that talks direct
// to
// the ByteBuffer to avoid object allocation and unnecessary
// buffering?
DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
answer = (Command)wireFormat.unmarshal(dataIn);
break;
}
}
if (answer != null) {
answer.setFrom(from);
if (LOG.isDebugEnabled()) {
LOG.debug("Channel: " + name + " received from: " + from + " about to process: " + answer);
}
}
return answer;
}
public void write(Command command, SocketAddress address) throws IOException {
synchronized (writeLock) {
ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
wireFormat.marshal(command, new DataOutputStream(largeBuffer));
byte[] data = largeBuffer.toByteArray();
int size = data.length;
ByteBuffer writeBuffer = bufferPool.borrowBuffer();
writeBuffer.clear();
headerMarshaller.writeHeader(command, writeBuffer);
if (size > writeBuffer.remaining()) {
// lets split the command up into chunks
int offset = 0;
boolean lastFragment = false;
int length = data.length;
for (int fragment = 0; !lastFragment; fragment++) {
// write the header
if (fragment > 0) {
writeBuffer = bufferPool.borrowBuffer();
writeBuffer.clear();
headerMarshaller.writeHeader(command, writeBuffer);
}
int chunkSize = writeBuffer.remaining();
// we need to remove the amount of overhead to write the
// partial command
// lets write the flags in there
BooleanStream bs = null;
if (wireFormat.isTightEncodingEnabled()) {
bs = new BooleanStream();
bs.writeBoolean(true); // the partial data byte[] is
// never null
}
// lets remove the header of the partial command
// which is the byte for the type and an int for the size of
// the byte[]
// data type + the command ID + size of the partial data
chunkSize -= 1 + 4 + 4;
// the boolean flags
if (bs != null) {
chunkSize -= bs.marshalledSize();
} else {
chunkSize -= 1;
}
if (!wireFormat.isSizePrefixDisabled()) {
// lets write the size of the command buffer
writeBuffer.putInt(chunkSize);
chunkSize -= 4;
}
lastFragment = offset + chunkSize >= length;
if (chunkSize + offset > length) {
chunkSize = length - offset;
}
if (lastFragment) {
writeBuffer.put(LastPartialCommand.DATA_STRUCTURE_TYPE);
} else {
writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE);
}
if (bs != null) {
bs.marshal(writeBuffer);
}
int commandId = command.getCommandId();
if (fragment > 0) {
commandId = sequenceGenerator.getNextSequenceId();
}
writeBuffer.putInt(commandId);
if (bs == null) {
writeBuffer.put((byte)1);
}
// size of byte array
writeBuffer.putInt(chunkSize);
// now the data
writeBuffer.put(data, offset, chunkSize);
offset += chunkSize;
sendWriteBuffer(commandId, address, writeBuffer, false);
}
} else {
writeBuffer.put(data);
sendWriteBuffer(command.getCommandId(), address, writeBuffer, false);
}
}
}
// Properties
// -------------------------------------------------------------------------
public ByteBufferPool getBufferPool() {
return bufferPool;
}
/**
* Sets the implementation of the byte buffer pool to use
*/
public void setBufferPool(ByteBufferPool bufferPool) {
this.bufferPool = bufferPool;
}
// Implementation methods
// -------------------------------------------------------------------------
protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer, boolean redelivery) throws IOException {
// lets put the datagram into the replay buffer first to prevent timing
// issues
ReplayBuffer bufferCache = getReplayBuffer();
if (bufferCache != null && !redelivery) {
bufferCache.addBuffer(commandId, writeBuffer);
}
writeBuffer.flip();
if (LOG.isDebugEnabled()) {
String text = redelivery ? "REDELIVERING" : "sending";
LOG.debug("Channel: " + name + " " + text + " datagram: " + commandId + " to: " + address);
}
channel.send(writeBuffer, address);
}
public void sendBuffer(int commandId, Object buffer) throws IOException {
if (buffer != null) {
ByteBuffer writeBuffer = (ByteBuffer)buffer;
sendWriteBuffer(commandId, getReplayAddress(), writeBuffer, true);
} else {
if (LOG.isWarnEnabled()) {
LOG.warn("Request for buffer: " + commandId + " is no longer present");
}
}
}
public int getReceiveCounter() {
return receiveCounter;
}
}