blob: f4cd06f3ef60e0c498330d09c8e99027f48bdb55 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.amqp_1_0.framing;
import org.apache.qpid.amqp_1_0.codec.FrameWriter;
import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
import org.apache.qpid.amqp_1_0.codec.ProtocolHeaderHandler;
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.codec.ValueWriter;
import org.apache.qpid.amqp_1_0.transport.BytesProcessor;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.transport.Open;
import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.UnsignedShort;
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
public class ConnectionHandler
{
private final ConnectionEndpoint _connection;
private ProtocolHandler _delegate;
private static final Logger FRAME_LOGGER = Logger.getLogger("FRM");
private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
public ConnectionHandler(final ConnectionEndpoint connection)
{
_connection = connection;
_delegate = new ProtocolHeaderHandler(connection);
}
public boolean parse(ByteBuffer in)
{
while(in.hasRemaining() && !isDone())
{
_delegate = _delegate.parse(in);
}
return isDone();
}
public boolean isDone()
{
return _delegate.isDone();
}
// ----------------------------------------------------------------
public static class FrameOutput<T> implements FrameOutputHandler<T>, FrameSource
{
private static final ByteBuffer EMPTY_BYTEBUFFER = ByteBuffer.wrap(new byte[0]);
private final BlockingQueue<AMQFrame<T>> _queue = new ArrayBlockingQueue<AMQFrame<T>>(100);
private ConnectionEndpoint _conn;
private final AMQFrame<T> _endOfFrameMarker = new AMQFrame<T>(null)
{
@Override public short getChannel()
{
throw new UnsupportedOperationException();
}
@Override public byte getFrameType()
{
throw new UnsupportedOperationException();
}
};
private boolean _setForClose;
private boolean _closed;
private long _nextHeartbeat;
public FrameOutput(final ConnectionEndpoint conn)
{
_conn = conn;
}
public boolean canSend()
{
return _queue.remainingCapacity() != 0;
}
public void send(AMQFrame<T> frame)
{
send(frame, null);
}
public void send(final AMQFrame<T> frame, final ByteBuffer payload)
{
synchronized(_conn.getLock())
{
try
{
// TODO HACK - check frame length
int size = _conn.getDescribedTypeRegistry()
.getValueWriter(frame.getFrameBody()).writeToBuffer(EMPTY_BYTEBUFFER) + 8;
if(size > _conn.getMaxFrameSize())
{
throw new OversizeFrameException(frame, size);
}
while(!_queue.offer(frame))
{
_conn.getLock().wait(1000L);
}
_conn.getLock().notifyAll();
}
catch (InterruptedException e)
{
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
public void close()
{
synchronized (_conn.getLock())
{
if(!_queue.offer(_endOfFrameMarker))
{
_setForClose = true;
}
_conn.getLock().notifyAll();
}
}
public AMQFrame<T> getNextFrame(final boolean wait)
{
synchronized(_conn.getLock())
{
long time = System.currentTimeMillis();
try
{
AMQFrame frame = null;
while(!closed() && (frame = _queue.poll()) == null && wait)
{
_conn.getLock().wait(_conn.getIdleTimeout()/2);
if(_conn.getIdleTimeout()>0)
{
time = System.currentTimeMillis();
if(frame == null && time > _nextHeartbeat)
{
frame = new TransportFrame((short) 0,null);
break;
}
}
}
if(frame != null)
{
_nextHeartbeat = time + _conn.getIdleTimeout()/2;
}
if(frame == _endOfFrameMarker)
{
_closed = true;
frame = null;
}
else if(_setForClose && frame != null)
{
_setForClose = !_queue.offer(_endOfFrameMarker);
}
if(frame != null && FRAME_LOGGER.isLoggable(Level.FINE))
{
FRAME_LOGGER.fine("SEND[" + _conn.getRemoteAddress() + "|" + frame.getChannel() + "] : " + frame.getFrameBody());
}
_conn.getLock().notifyAll();
return frame;
}
catch (InterruptedException e)
{
_conn.setClosedForOutput(true);
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
return null;
}
}
}
public boolean closed()
{
return _closed;
}
}
public static interface FrameSource<T>
{
AMQFrame<T> getNextFrame(boolean wait);
boolean closed();
}
public static interface BytesSource
{
void getBytes(BytesProcessor processor, boolean wait);
boolean closed();
}
public static class FrameToBytesSourceAdapter implements BytesSource
{
private final FrameSource _frameSource;
private final FrameWriter _writer;
private static final int BUF_SIZE = 1<<16;
private final byte[] _bytes = new byte[BUF_SIZE];
private final ByteBuffer _buffer = ByteBuffer.wrap(_bytes);
public FrameToBytesSourceAdapter(final FrameSource frameSource, ValueWriter.Registry registry)
{
_frameSource = frameSource;
_writer = new FrameWriter(registry);
}
public void getBytes(final BytesProcessor processor, final boolean wait)
{
AMQFrame frame;
if(_buffer.position() == 0 && !_frameSource.closed())
{
if(!_writer.isComplete())
{
_writer.writeToBuffer(_buffer);
}
while(_buffer.hasRemaining())
{
if((frame = _frameSource.getNextFrame(wait && _buffer.position()==0)) != null)
{
_writer.setValue(frame);
try
{
_writer.writeToBuffer(_buffer);
}
catch(RuntimeException e)
{
e.printStackTrace();
throw e;
}
catch(Error e)
{
e.printStackTrace();
throw e;
}
}
else
{
break;
}
}
_buffer.flip();
}
if(_buffer.limit() != 0)
{
processor.processBytes(_buffer);
if(_buffer.remaining() == 0)
{
_buffer.clear();
}
}
}
public boolean closed()
{
return _buffer.position() == 0 && _frameSource.closed();
}
}
public static class HeaderBytesSource implements BytesSource
{
private final ByteBuffer _buffer;
private ConnectionEndpoint _conn;
public HeaderBytesSource(ConnectionEndpoint conn, byte... headerBytes)
{
_conn = conn;
_buffer = ByteBuffer.wrap(headerBytes);
}
public void getBytes(final BytesProcessor processor, final boolean wait)
{
if(!_conn.closedForOutput())
{
processor.processBytes(_buffer);
}
}
public boolean closed()
{
return !_conn.closedForOutput() && !_buffer.hasRemaining();
}
}
public static class SequentialBytesSource implements BytesSource
{
private Queue<BytesSource> _sources = new LinkedList<BytesSource>();
public SequentialBytesSource(BytesSource... sources)
{
_sources.addAll(Arrays.asList(sources));
}
public synchronized void addSource(BytesSource source)
{
_sources.add(source);
}
public synchronized void getBytes(final BytesProcessor processor, final boolean wait)
{
BytesSource src = _sources.peek();
while (src != null && src.closed())
{
_sources.poll();
src = _sources.peek();
}
if(src != null)
{
src.getBytes(processor, wait);
}
}
public boolean closed()
{
return _sources.isEmpty();
}
}
public static class BytesOutputHandler implements Runnable, BytesProcessor
{
private final OutputStream _outputStream;
private BytesSource _bytesSource;
private boolean _closed;
private ConnectionEndpoint _conn;
public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn)
{
_outputStream = outputStream;
_bytesSource = source;
_conn = conn;
}
public void run()
{
final BytesSource bytesSource = _bytesSource;
while(!(_closed || bytesSource.closed()))
{
_bytesSource.getBytes(this, true);
}
}
public void processBytes(final ByteBuffer buf)
{
try
{
if(RAW_LOGGER.isLoggable(Level.FINE))
{
Binary bin = new Binary(buf.array(),buf.arrayOffset()+buf.position(), buf.limit()-buf.position());
RAW_LOGGER.fine("SEND["+ _conn.getRemoteAddress() +"] : " + bin.toString());
}
_outputStream.write(buf.array(),buf.arrayOffset()+buf.position(), buf.limit()-buf.position());
buf.position(buf.limit());
}
catch (IOException e)
{
_closed = true;
e.printStackTrace(); //TODO
}
}
}
public static class OutputHandler implements Runnable
{
private final OutputStream _outputStream;
private FrameSource _frameSource;
private static final int BUF_SIZE = 1<<16;
private ValueWriter.Registry _registry;
public OutputHandler(OutputStream outputStream, FrameSource source, ValueWriter.Registry registry)
{
_outputStream = outputStream;
_frameSource = source;
_registry = registry;
}
public void run()
{
int i=0;
try
{
byte[] buffer = new byte[BUF_SIZE];
ByteBuffer buf = ByteBuffer.wrap(buffer);
buf.put((byte)'A');
buf.put((byte)'M');
buf.put((byte)'Q');
buf.put((byte)'P');
buf.put((byte) 0);
buf.put((byte) 1);
buf.put((byte) 0);
buf.put((byte) 0);
final FrameSource frameSource = _frameSource;
AMQFrame frame;
FrameWriter writer = new FrameWriter(_registry);
while(!frameSource.closed())
{
if(!writer.isComplete())
{
writer.writeToBuffer(buf);
}
while(buf.hasRemaining())
{
if((frame = frameSource.getNextFrame(buf.position()==0)) != null)
{
writer.setValue(frame);
int size = writer.writeToBuffer(buf);
}
else
{
break;
}
}
if(buf.limit() != 0)
{
_outputStream.write(buffer,0, buf.position());
buf.clear();
}
}
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
public static void main(String[] args) throws AmqpErrorException
{
byte[] buffer = new byte[76];
ByteBuffer buf = ByteBuffer.wrap(buffer);
AMQPDescribedTypeRegistry registry = AMQPDescribedTypeRegistry.newInstance()
.registerTransportLayer()
.registerMessagingLayer()
.registerTransactionLayer();
Open open = new Open();
// Open(container_id="venture", channel_max=10, hostname="foo", offered_capabilities=[Symbol("one"), Symbol("two"), Symbol("three")])
open.setContainerId("venture");
open.setChannelMax(UnsignedShort.valueOf((short) 10));
open.setHostname("foo");
open.setOfferedCapabilities(new Symbol[] {Symbol.valueOf("one"),Symbol.valueOf("two"),Symbol.valueOf("three")});
ValueWriter<Open> writer = registry.getValueWriter(open);
System.out.println("------ Encode (time in ms for 1 million opens)");
Long myLong = Long.valueOf(32);
ValueWriter<Long> writer2 = registry.getValueWriter(myLong);
Double myDouble = Double.valueOf(3.14159265359);
ValueWriter<Double> writer3 = registry.getValueWriter(myDouble);
for(int n = 0; n < 1/*00*/; n++)
{
long startTime = System.currentTimeMillis();
for(int i = 1/*000000*/; i !=0; i--)
{
buf.position(0);
writer.setValue(open);
writer.writeToBuffer(buf);
writer2.setValue(myLong);
writer.writeToBuffer(buf);
writer3.setValue(myDouble);
writer3.writeToBuffer(buf);
}
long midTime = System.currentTimeMillis();
System.out.println((midTime - startTime));
}
ValueHandler handler = new ValueHandler(registry);
System.out.println("------ Decode (time in ms for 1 million opens)");
for(int n = 0; n < 100; n++)
{
long startTime = System.currentTimeMillis();
for(int i = 1000000; i !=0; i--)
{
buf.flip();
handler.parse(buf);
handler.parse(buf);
handler.parse(buf);
}
long midTime = System.currentTimeMillis();
System.out.println((midTime - startTime));
}
}
}