blob: d66964b3bbf78e0adcb9e15148b6e37d6807f7f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.proton.engine.impl;
import static org.apache.qpid.proton.engine.impl.AmqpHeader.HEADER;
import static org.apache.qpid.proton.engine.impl.TransportTestHelper.stringOfLength;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Random;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedShort;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.Attach;
import org.apache.qpid.proton.amqp.transport.Begin;
import org.apache.qpid.proton.amqp.transport.Close;
import org.apache.qpid.proton.amqp.transport.Detach;
import org.apache.qpid.proton.amqp.transport.Disposition;
import org.apache.qpid.proton.amqp.transport.End;
import org.apache.qpid.proton.amqp.transport.Flow;
import org.apache.qpid.proton.amqp.transport.FrameBody;
import org.apache.qpid.proton.amqp.transport.Open;
import org.apache.qpid.proton.amqp.transport.Role;
import org.apache.qpid.proton.amqp.transport.Transfer;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.framing.TransportFrame;
import org.apache.qpid.proton.message.Message;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
public class TransportImplTest
{
private TransportImpl _transport = new TransportImpl();
private static final int CHANNEL_ID = 1;
private static final TransportFrame TRANSPORT_FRAME_BEGIN = new TransportFrame(CHANNEL_ID, new Begin(), null);
private static final TransportFrame TRANSPORT_FRAME_OPEN = new TransportFrame(CHANNEL_ID, new Open(), null);
private static final int BUFFER_SIZE = 8 * 1024;
@Rule
public ExpectedException _expectedException = ExpectedException.none();
@Test
public void testInput()
{
ByteBuffer buffer = _transport.getInputBuffer();
buffer.put(HEADER);
_transport.processInput().checkIsOk();
assertNotNull(_transport.getInputBuffer());
}
@Test
public void testInitialProcessIsNoop()
{
_transport.process();
}
@Test
public void testProcessIsIdempotent()
{
_transport.process();
_transport.process();
}
/**
* Empty input is always allowed by {@link Transport#getInputBuffer()} and
* {@link Transport#processInput()}, in contrast to the old API.
*
* @see TransportImplTest#testEmptyInputBeforeBindUsingOldApi_causesTransportException()
*/
@Test
public void testEmptyInput_isAllowed()
{
_transport.getInputBuffer();
_transport.processInput().checkIsOk();
}
/**
* Tests the end-of-stream behaviour specified by {@link Transport#input(byte[], int, int)}.
*/
@Test
public void testEmptyInputBeforeBindUsingOldApi_causesTransportException()
{
_expectedException.expect(TransportException.class);
_expectedException.expectMessage("Unexpected EOS when remote connection not closed: connection aborted");
_transport.input(new byte [0], 0, 0);
}
/**
* TODO it's not clear why empty input is specifically allowed in this case.
*/
@Test
public void testEmptyInputWhenRemoteConnectionIsClosedUsingOldApi_isAllowed()
{
ConnectionImpl connection = new ConnectionImpl();
_transport.bind(connection);
connection.setRemoteState(EndpointState.CLOSED);
_transport.input(new byte [0], 0, 0);
}
@Test
public void testOutupt()
{
{
// TransportImpl's underlying output spontaneously outputs the AMQP header
final ByteBuffer outputBuffer = _transport.getOutputBuffer();
assertEquals(HEADER.length, outputBuffer.remaining());
byte[] outputBytes = new byte[HEADER.length];
outputBuffer.get(outputBytes);
assertArrayEquals(HEADER, outputBytes);
_transport.outputConsumed();
}
{
final ByteBuffer outputBuffer = _transport.getOutputBuffer();
assertEquals(0, outputBuffer.remaining());
_transport.outputConsumed();
}
}
@Test
public void testOutputBufferIsReadOnly()
{
doTestTransportBufferReadability(true, false);
}
@Test
public void testOutputBufferNotReadOnlyWhenConfigured()
{
doTestTransportBufferReadability(false, false);
}
@Test
public void testHeadIsReadOnly()
{
doTestTransportBufferReadability(true, true);
}
@Test
public void testHeadNotReadOnlyWhenConfigured()
{
doTestTransportBufferReadability(false, true);
}
private void doTestTransportBufferReadability(boolean readOnly, boolean headOrOutput)
{
TransportImpl transport = new TransportImpl();
// Default should be Read-Only
if (!readOnly) {
transport.setUseReadOnlyOutputBuffer(readOnly);
}
final ByteBuffer outputBuffer;
if (headOrOutput) {
outputBuffer = transport.head();
} else {
outputBuffer = transport.getOutputBuffer();
}
assertTrue(outputBuffer.hasRemaining());
if (readOnly) {
assertTrue(outputBuffer.isReadOnly());
} else {
assertFalse(outputBuffer.isReadOnly());
}
byte[] outputBytes = new byte[outputBuffer.remaining()];
outputBuffer.get(outputBytes);
transport.outputConsumed();
final ByteBuffer emptyBuffer;
if (headOrOutput) {
emptyBuffer = transport.head();
} else {
emptyBuffer = transport.getOutputBuffer();
}
assertFalse(emptyBuffer.hasRemaining());
if (readOnly) {
assertTrue(emptyBuffer.isReadOnly());
} else {
assertFalse(emptyBuffer.isReadOnly());
}
}
@Test
public void testTransportInitiallyHandlesFrames()
{
assertTrue(_transport.isHandlingFrames());
}
@Test
public void testBoundTransport_continuesToHandleFrames()
{
Connection connection = new ConnectionImpl();
assertTrue(_transport.isHandlingFrames());
_transport.bind(connection);
assertTrue(_transport.isHandlingFrames());
_transport.handleFrame(TRANSPORT_FRAME_OPEN);
assertTrue(_transport.isHandlingFrames());
}
@Test
public void testUnboundTransport_stopsHandlingFrames()
{
assertTrue(_transport.isHandlingFrames());
_transport.handleFrame(TRANSPORT_FRAME_OPEN);
assertFalse(_transport.isHandlingFrames());
}
@Test
public void testHandleFrameWhenNotHandling_throwsIllegalStateException()
{
assertTrue(_transport.isHandlingFrames());
_transport.handleFrame(TRANSPORT_FRAME_OPEN);
assertFalse(_transport.isHandlingFrames());
_expectedException.expect(IllegalStateException.class);
_transport.handleFrame(TRANSPORT_FRAME_BEGIN);
}
@Test
public void testOutputTooBigToBeWrittenInOneGo()
{
int smallMaxFrameSize = 512;
_transport = new TransportImpl(smallMaxFrameSize);
Connection conn = new ConnectionImpl();
_transport.bind(conn);
// Open frame sized in order to produce a frame that will almost fill output buffer
conn.setHostname(stringOfLength("x", 500));
conn.open();
// Close the connection to generate a Close frame which will cause an overflow
// internally - we'll get the remaining bytes on the next interaction.
conn.close();
ByteBuffer buf = _transport.getOutputBuffer();
assertEquals("Expecting buffer to be full", smallMaxFrameSize, buf.remaining());
buf.position(buf.limit());
_transport.outputConsumed();
buf = _transport.getOutputBuffer();
assertTrue("Expecting second buffer to have bytes", buf.remaining() > 0);
assertTrue("Expecting second buffer to not be full", buf.remaining() < Transport.MIN_MAX_FRAME_SIZE);
}
@Test
public void testAttemptToInitiateSaslAfterProcessingBeginsCausesIllegalStateException()
{
_transport.process();
try
{
_transport.sasl();
}
catch(IllegalStateException ise)
{
//expected, sasl must be initiated before processing begins
}
}
@Test
public void testChannelMaxDefault() throws Exception
{
Transport transport = Proton.transport();
assertEquals("Unesxpected value for channel-max", 65535, transport.getChannelMax());
}
@Test
public void testSetGetChannelMax() throws Exception
{
Transport transport = Proton.transport();
int channelMax = 456;
transport.setChannelMax(channelMax);
assertEquals("Unesxpected value for channel-max", channelMax, transport.getChannelMax());
}
@Test
public void testSetChannelMaxOutsideLegalUshortRangeThrowsIAE() throws Exception
{
Transport transport = Proton.transport();
try {
transport.setChannelMax( 1 << 16);
fail("Expected exception to be thrown");
} catch (IllegalArgumentException iae ){
// Expected
}
try {
transport.setChannelMax(-1);
fail("Expected exception to be thrown");
} catch (IllegalArgumentException iae ){
// Expected
}
}
private class MockTransportImpl extends TransportImpl
{
public MockTransportImpl() {
super();
}
public MockTransportImpl(int maxFrameSize) {
super(maxFrameSize);
}
LinkedList<FrameBody> writes = new LinkedList<FrameBody>();
@Override
protected void writeFrame(int channel, FrameBody frameBody,
ReadableBuffer payload, Runnable onPayloadTooLarge) {
super.writeFrame(channel, frameBody, payload, onPayloadTooLarge);
writes.addLast(frameBody);
}
}
@Test
public void testTickRemoteTimeout()
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
int timeout = 4000;
Open open = new Open();
open.setIdleTimeOut(new UnsignedInteger(4000));
TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
transport.handleFrame(openFrame);
pumpMockTransport(transport);
long deadline = transport.tick(0);
assertEquals("Expected to be returned a deadline of 2000", 2000, deadline); // deadline = 4000 / 2
deadline = transport.tick(1000); // Wait for less than the deadline with no data - get the same value
assertEquals("When the deadline hasn't been reached tick() should return the previous deadline", 2000, deadline);
assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 0, transport.writes.size());
deadline = transport.tick(timeout/2); // Wait for the deadline - next deadline should be (4000/2)*2
assertEquals("When the deadline has been reached expected a new deadline to be returned 4000", 4000, deadline);
assertEquals("tick() should have written data", 1, transport.writes.size());
assertEquals("tick() should have written an empty frame", null, transport.writes.get(0));
transport.writeFrame(CHANNEL_ID, new Begin(), null, null);
while(transport.pending() > 0) transport.pop(transport.head().remaining());
int framesWrittenBeforeTick = transport.writes.size();
deadline = transport.tick(3000);
assertEquals("Writing data resets the deadline", 5000, deadline);
assertEquals("When the deadline is reset tick() shouldn't write an empty frame", 0, transport.writes.size() - framesWrittenBeforeTick);
transport.writeFrame(CHANNEL_ID, new Attach(), null, null);
assertTrue(transport.pending() > 0);
framesWrittenBeforeTick = transport.writes.size();
deadline = transport.tick(4000);
assertEquals("Having pending data does not reset the deadline", 5000, deadline);
assertEquals("Having pending data prevents tick() from sending an empty frame", 0, transport.writes.size() - framesWrittenBeforeTick);
}
@Test
public void testTickLocalTimeout()
{
MockTransportImpl transport = new MockTransportImpl();
transport.setIdleTimeout(4000);
Connection connection = Proton.connection();
transport.bind(connection);
transport.handleFrame(TRANSPORT_FRAME_OPEN);
connection.open();
pumpMockTransport(transport);
long deadline = transport.tick(0);
assertEquals("Expected to be returned a deadline of 4000", 4000, deadline);
int framesWrittenBeforeTick = transport.writes.size();
deadline = transport.tick(1000); // Wait for less than the deadline with no data - get the same value
assertEquals("When the deadline hasn't been reached tick() should return the previous deadline", 4000, deadline);
assertEquals("Reading data should never result in a frame being written", 0, transport.writes.size() - framesWrittenBeforeTick);
// Protocol header + empty frame
ByteBuffer data = ByteBuffer.wrap(new byte[] {'A', 'M', 'Q', 'P', 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00});
processInput(transport, data);
framesWrittenBeforeTick = transport.writes.size();
deadline = transport.tick(2000);
assertEquals("Reading data data resets the deadline", 6000, deadline);
assertEquals("Reading data should never result in a frame being written", 0, transport.writes.size() - framesWrittenBeforeTick);
assertEquals("Reading data before the deadline should keep the connection open", EndpointState.ACTIVE, connection.getLocalState());
framesWrittenBeforeTick = transport.writes.size();
deadline = transport.tick(7000);
assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
}
/*
* No frames should be written until the Connection object is
* opened, at which point the Open, and Begin frames should
* be pipelined together.
*/
@Test
public void testOpenSessionBeforeOpenConnection()
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
Session session = connection.session();
session.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size());
connection.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
}
/*
* No frames should be written until the Connection object is
* opened, at which point the Open, Begin, and Attach frames
* should be pipelined together.
*/
@Test
public void testOpenReceiverBeforeOpenConnection()
{
doOpenLinkBeforeOpenConnectionTestImpl(true);
}
/**
* No frames should be written until the Connection object is
* opened, at which point the Open, Begin, and Attach frames
* should be pipelined together.
*/
@Test
public void testOpenSenderBeforeOpenConnection()
{
doOpenLinkBeforeOpenConnectionTestImpl(false);
}
void doOpenLinkBeforeOpenConnectionTestImpl(boolean receiverLink)
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
Session session = connection.session();
session.open();
Link link = null;
if(receiverLink)
{
link = session.receiver("myReceiver");
}
else
{
link = session.sender("mySender");
}
link.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size());
// Now open the connection, expect the Open, Begin, and Attach frames
connection.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
}
/*
* No attach frame should be written before the Session begin is sent.
*/
@Test
public void testOpenReceiverBeforeOpenSession()
{
doOpenLinkBeforeOpenSessionTestImpl(true);
}
/*
* No attach frame should be written before the Session begin is sent.
*/
@Test
public void testOpenSenderBeforeOpenSession()
{
doOpenLinkBeforeOpenSessionTestImpl(false);
}
void doOpenLinkBeforeOpenSessionTestImpl(boolean receiverLink)
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
// Open the connection
connection.open();
// Create but don't open the session
Session session = connection.session();
// Open the link
Link link = null;
if(receiverLink)
{
link = session.receiver("myReceiver");
}
else
{
link = session.sender("mySender");
}
link.open();
pumpMockTransport(transport);
// Expect only an Open frame, no attach should be sent as the session isn't open
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 1, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
// Now open the session, expect the Begin
session.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
// Note: an Attach wasn't sent because link is no longer 'modified' after earlier pump. It
// could easily be argued it should, given how the engine generally handles things. Seems
// unlikely to be of much real world concern.
//assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
}
/*
* Verify that no Attach frame is emitted by the Transport should a Receiver
* be opened after the session End frame was sent.
*/
@Test
public void testReceiverAttachAfterEndSent()
{
doLinkAttachAfterEndSentTestImpl(true);
}
/*
* Verify that no Attach frame is emitted by the Transport should a Sender
* be opened after the session End frame was sent.
*/
@Test
public void testSenderAttachAfterEndSent()
{
doLinkAttachAfterEndSentTestImpl(false);
}
void doLinkAttachAfterEndSentTestImpl(boolean receiverLink)
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
session.open();
Link link = null;
if(receiverLink)
{
link = session.receiver("myReceiver");
}
else
{
link = session.sender("mySender");
}
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
// Send the necessary responses to open/begin
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
transport.handleFrame(new TransportFrame(0, begin, null));
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
// Cause a End frame to be sent
session.close();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof End);
// Open the link and verify the transport doesn't
// send any Attach frame, as an End frame was sent already.
link.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
}
/*
* Verify that no Attach frame is emitted by the Transport should a Receiver
* be closed after the session End frame was sent.
*/
@Test
public void testReceiverCloseAfterEndSent()
{
doLinkDetachAfterEndSentTestImpl(true);
}
/*
* Verify that no Attach frame is emitted by the Transport should a Sender
* be closed after the session End frame was sent.
*/
@Test
public void testSenderCloseAfterEndSent()
{
doLinkDetachAfterEndSentTestImpl(false);
}
void doLinkDetachAfterEndSentTestImpl(boolean receiverLink)
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
session.open();
Link link = null;
if(receiverLink)
{
link = session.receiver("myReceiver");
}
else
{
link = session.sender("mySender");
}
link.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
// Send the necessary responses to open/begin
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
transport.handleFrame(new TransportFrame(0, begin, null));
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
// Cause an End frame to be sent
session.close();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(3) instanceof End);
// Close the link and verify the transport doesn't
// send any Detach frame, as an End frame was sent already.
link.close();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
}
/*
* No frames should be written until the Connection object is
* opened, at which point the Open and Begin frames should
* be pipelined together.
*/
@Test
public void testReceiverFlowBeforeOpenConnection()
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
Session session = connection.session();
session.open();
Receiver reciever = session.receiver("myReceiver");
reciever.flow(5);
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size());
// Now open the connection, expect the Open and Begin frames but
// nothing else as we haven't opened the receiver itself yet.
connection.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
}
@Test
public void testSenderSendBeforeOpenConnection()
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
Collector collector = Collector.Factory.create();
connection.collect(collector);
Session session = connection.session();
session.open();
String linkName = "mySender";
Sender sender = session.sender(linkName);
sender.open();
sendMessage(sender, "tag1", "content1");
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size());
// Now open the connection, expect the Open and Begin and Attach frames but
// nothing else as we the sender wont have credit yet.
connection.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
// Send the necessary responses to open/begin/attach then give sender credit
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach = new Attach();
attach.setHandle(UnsignedInteger.ZERO);
attach.setRole(Role.RECEIVER);
attach.setName(linkName);
attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach, null));
Flow flow = new Flow();
flow.setHandle(UnsignedInteger.ZERO);
flow.setDeliveryCount(UnsignedInteger.ZERO);
flow.setNextIncomingId(UnsignedInteger.ONE);
flow.setNextOutgoingId(UnsignedInteger.ZERO);
flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
flow.setLinkCredit(UnsignedInteger.valueOf(10));
transport.handleFrame(new TransportFrame(0, flow, null));
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
// Now pump the transport again and expect a transfer for the message
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer);
}
@Test
public void testEmitFlowEventOnSend()
{
doEmitFlowOnSendTestImpl(true);
}
public void testSupressFlowEventOnSend()
{
doEmitFlowOnSendTestImpl(false);
}
void doEmitFlowOnSendTestImpl(boolean emitFlowEventOnSend)
{
MockTransportImpl transport = new MockTransportImpl();
transport.setEmitFlowEventOnSend(emitFlowEventOnSend);
Connection connection = Proton.connection();
transport.bind(connection);
Collector collector = Collector.Factory.create();
connection.collect(collector);
Session session = connection.session();
session.open();
String linkName = "mySender";
Sender sender = session.sender(linkName);
sender.open();
sendMessage(sender, "tag1", "content1");
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size());
assertEvents(collector, Event.Type.CONNECTION_INIT, Event.Type.SESSION_INIT, Event.Type.SESSION_LOCAL_OPEN,
Event.Type.TRANSPORT, Event.Type.LINK_INIT, Event.Type.LINK_LOCAL_OPEN, Event.Type.TRANSPORT);
// Now open the connection, expect the Open and Begin frames but
// nothing else as we haven't opened the receiver itself yet.
connection.open();
pumpMockTransport(transport);
assertEvents(collector, Event.Type.CONNECTION_LOCAL_OPEN, Event.Type.TRANSPORT);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
// Send the necessary responses to open/begin/attach then give sender credit
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach = new Attach();
attach.setHandle(UnsignedInteger.ZERO);
attach.setRole(Role.RECEIVER);
attach.setName(linkName);
attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach, null));
Flow flow = new Flow();
flow.setHandle(UnsignedInteger.ZERO);
flow.setDeliveryCount(UnsignedInteger.ZERO);
flow.setNextIncomingId(UnsignedInteger.ONE);
flow.setNextOutgoingId(UnsignedInteger.ZERO);
flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
flow.setLinkCredit(UnsignedInteger.valueOf(10));
transport.handleFrame(new TransportFrame(0, flow, null));
assertEvents(collector, Event.Type.CONNECTION_REMOTE_OPEN, Event.Type.SESSION_REMOTE_OPEN,
Event.Type.LINK_REMOTE_OPEN, Event.Type.LINK_FLOW);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
// Now pump the transport again and expect a transfer for the message
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Transfer);
// Verify that we did, or did not, emit a flow event
if(emitFlowEventOnSend)
{
assertEvents(collector, Event.Type.LINK_FLOW);
}
else
{
assertNoEvents(collector);
}
}
/**
* Verify that no Begin frame is emitted by the Transport should a Session open
* after the Close frame was sent.
*/
@Test
public void testSessionBeginAfterCloseSent()
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 1, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
// Send the necessary response to Open
transport.handleFrame(new TransportFrame(0, new Open(), null));
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 1, transport.writes.size());
// Cause a Close frame to be sent
connection.close();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Close);
// Open the session and verify the transport doesn't
// send any Begin frame, as a Close frame was sent already.
session.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
}
/**
* Verify that no End frame is emitted by the Transport should a Session close
* after the Close frame was sent.
*/
@Test
public void testSessionEndAfterCloseSent()
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
session.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
// Send the necessary responses to open/begin
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
transport.handleFrame(new TransportFrame(0, begin, null));
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
// Cause a Close frame to be sent
connection.close();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Close);
// Close the session and verify the transport doesn't
// send any End frame, as a Close frame was sent already.
session.close();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
}
@Test
public void testEmittedSessionIncomingWindow()
{
doSessionIncomingWindowTestImpl(false, false);
doSessionIncomingWindowTestImpl(true, false);
doSessionIncomingWindowTestImpl(false, true);
doSessionIncomingWindowTestImpl(true, true);
}
private void doSessionIncomingWindowTestImpl(boolean setFrameSize, boolean setSessionCapacity) {
MockTransportImpl transport;
if(setFrameSize) {
transport = new MockTransportImpl(5*1024);
} else {
transport = new MockTransportImpl();
}
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
int sessionCapacity = 0;
if(setSessionCapacity) {
sessionCapacity = 100*1024;
session.setIncomingCapacity(sessionCapacity);
}
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 1, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
// Provide an Open response
transport.handleFrame(new TransportFrame(0, new Open(), null));
// Open session and verify emitted incoming window
session.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
Begin sentBegin = (Begin) transport.writes.get(1);
assertEquals("Unexpected session capacity", sessionCapacity, session.getIncomingCapacity());
int expectedWindowSize = 2147483647;
if(setSessionCapacity && setFrameSize) {
expectedWindowSize = (100*1024) / (5*1024); // capacity / frameSize
}
assertEquals("Unexpected session window", UnsignedInteger.valueOf(expectedWindowSize), sentBegin.getIncomingWindow());
// Open receiver
String linkName = "myReceiver";
Receiver receiver = session.receiver(linkName);
receiver.open();
pumpMockTransport(transport);
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
// Provide an begin+attach response
Begin beginResponse = new Begin();
beginResponse.setRemoteChannel(UnsignedShort.valueOf((short) 0));
beginResponse.setNextOutgoingId(UnsignedInteger.ONE);
beginResponse.setIncomingWindow(UnsignedInteger.valueOf(1024));
beginResponse.setOutgoingWindow(UnsignedInteger.valueOf(1024));
transport.handleFrame(new TransportFrame(0, beginResponse, null));
Attach attach = new Attach();
attach.setHandle(UnsignedInteger.ZERO);
attach.setRole(Role.SENDER);
attach.setName(linkName);
attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach, null));
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
// Flow some credit, verify emitted incoming window remains the same
receiver.flow(1);
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Flow);
Flow sentFlow = (Flow) transport.writes.get(3);
assertEquals("Unexpected session window", UnsignedInteger.valueOf(expectedWindowSize), sentFlow.getIncomingWindow());
// Provide a transfer, don't consume it, flow more credit, verify the emitted
// incoming window (should reduce 1 if capacity and frame size set)
String deliveryTag = "tag1";
String messageContent = "content1";
handleTransfer(transport, 1, deliveryTag, messageContent);
assertTrue("Unexpected session byte count", session.getIncomingBytes() > 0);
receiver.flow(1);
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(4) instanceof Flow);
sentFlow = (Flow) transport.writes.get(4);
if(setSessionCapacity && setFrameSize) {
expectedWindowSize = expectedWindowSize -1;
}
assertEquals("Unexpected session window", UnsignedInteger.valueOf(expectedWindowSize), sentFlow.getIncomingWindow());
// Consume the transfer then flow more credit, verify the emitted
// incoming window (should increase 1 if capacity and frame size set)
verifyDelivery(receiver, deliveryTag, messageContent);
assertEquals("Unexpected session byte count", 0, session.getIncomingBytes());
receiver.flow(1);
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(5) instanceof Flow);
sentFlow = (Flow) transport.writes.get(5);
if(setSessionCapacity && setFrameSize) {
expectedWindowSize = expectedWindowSize +1;
}
assertEquals("Unexpected session window", UnsignedInteger.valueOf(expectedWindowSize), sentFlow.getIncomingWindow());
}
/**
* Verify that no Attach frame is emitted by the Transport should a Receiver
* be opened after the Close frame was sent.
*/
@Test
public void testReceiverAttachAfterCloseSent()
{
doLinkAttachAfterCloseSentTestImpl(true);
}
/**
* Verify that no Attach frame is emitted by the Transport should a Sender
* be opened after the Close frame was sent.
*/
@Test
public void testSenderAttachAfterCloseSent()
{
doLinkAttachAfterCloseSentTestImpl(false);
}
void doLinkAttachAfterCloseSentTestImpl(boolean receiverLink)
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
session.open();
Link link = null;
if(receiverLink)
{
link = session.receiver("myReceiver");
}
else
{
link = session.sender("mySender");
}
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
// Send the necessary responses to open/begin
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
transport.handleFrame(new TransportFrame(0, begin, null));
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 2, transport.writes.size());
// Cause a Close frame to be sent
connection.close();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Close);
// Open the link and verify the transport doesn't
// send any Attach frame, as a Close frame was sent already.
link.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
}
/**
* Verify that no Flow frame is emitted by the Transport should a Receiver
* have credit added after the Close frame was sent.
*/
@Test
public void testReceiverFlowAfterCloseSent()
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
session.open();
String linkName = "myReceiver";
Receiver receiver = session.receiver(linkName);
receiver.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
// Send the necessary responses to open/begin/attach
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach = new Attach();
attach.setHandle(UnsignedInteger.ZERO);
attach.setRole(Role.RECEIVER);
attach.setName(linkName);
attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach, null));
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
// Cause the Close frame to be sent
connection.close();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Close);
// Grant new credit for the Receiver and verify the transport doesn't
// send any Flow frame, as a Close frame was sent already.
receiver.flow(1);
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
}
/**
* Verify that no Flow frame is emitted by the Transport should a Receiver
* have pending drain when a detach is sent for that receiver.
*/
@Test
public void testNoReceiverFlowAfterDetachSentWhileDraining()
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
session.open();
String linkName = "myReceiver";
Receiver receiver = session.receiver(linkName);
receiver.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
// Send the necessary responses to open/begin/attach
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach = new Attach();
attach.setHandle(UnsignedInteger.ZERO);
attach.setRole(Role.RECEIVER);
attach.setName(linkName);
attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach, null));
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
// Start a drain for the Receiver and verify the transport doesn't
// send any Flow frame, due to the detach being initiated.
receiver.drain(10);
pumpMockTransport(transport);
// Cause the Detach frame to be sent
receiver.detach();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(4) instanceof Detach);
}
/**
* Verify that no Flow frame is emitted by the Transport should a Sender
* have credit drained added after the Close frame was sent.
*/
@Test
public void testSenderFlowAfterCloseSent()
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Collector collector = Collector.Factory.create();
connection.collect(collector);
Session session = connection.session();
session.open();
String linkName = "mySender";
Sender sender = session.sender(linkName);
sender.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
assertFalse("Should not be in drain yet", sender.getDrain());
// Send the necessary responses to open/begin/attach then give sender credit and drain
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach = new Attach();
attach.setHandle(UnsignedInteger.ZERO);
attach.setRole(Role.RECEIVER);
attach.setName(linkName);
attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach, null));
int credit = 10;
Flow flow = new Flow();
flow.setHandle(UnsignedInteger.ZERO);
flow.setDeliveryCount(UnsignedInteger.ZERO);
flow.setNextIncomingId(UnsignedInteger.ONE);
flow.setNextOutgoingId(UnsignedInteger.ZERO);
flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
flow.setDrain(true);
flow.setLinkCredit(UnsignedInteger.valueOf(credit));
transport.handleFrame(new TransportFrame(0, flow, null));
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
assertTrue("Should not be in drain", sender.getDrain());
assertEquals("Should have credit", credit, sender.getCredit());
// Cause the Close frame to be sent
connection.close();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Close);
// Drain the credit and verify the transport doesn't
// send any Flow frame, as a Close frame was sent already.
int drained = sender.drained();
assertEquals("Should have drained all credit", credit, drained);
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
}
/**
* Verify that no Disposition frame is emitted by the Transport should a Delivery
* have disposition applied after the Close frame was sent.
*/
@Test
public void testDispositionAfterCloseSent()
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
session.open();
String linkName = "myReceiver";
Receiver receiver = session.receiver(linkName);
receiver.flow(5);
receiver.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Flow);
Delivery delivery = receiver.current();
assertNull("Should not yet have a delivery", delivery);
// Send the necessary responses to open/begin/attach as well as a transfer
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
begin.setNextOutgoingId(UnsignedInteger.ONE);
begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach = new Attach();
attach.setHandle(UnsignedInteger.ZERO);
attach.setRole(Role.SENDER);
attach.setName(linkName);
attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach, null));
String deliveryTag = "tag1";
String messageContent = "content1";
handleTransfer(transport, 1, deliveryTag, messageContent);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
delivery = verifyDelivery(receiver, deliveryTag, messageContent);
assertNotNull("Should now have a delivery", delivery);
// Cause the Close frame to be sent
connection.close();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(4) instanceof Close);
delivery.disposition(Released.getInstance());
delivery.settle();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
}
/**
* Verify that no Transfer frame is emitted by the Transport should a Delivery
* be sendable after the Close frame was sent.
*/
@Test
public void testTransferAfterCloseSent()
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Collector collector = Collector.Factory.create();
connection.collect(collector);
Session session = connection.session();
session.open();
String linkName = "mySender";
Sender sender = session.sender(linkName);
sender.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
// Send the necessary responses to open/begin/attach then give sender credit
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach = new Attach();
attach.setHandle(UnsignedInteger.ZERO);
attach.setRole(Role.RECEIVER);
attach.setName(linkName);
attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach, null));
Flow flow = new Flow();
flow.setHandle(UnsignedInteger.ZERO);
flow.setDeliveryCount(UnsignedInteger.ZERO);
flow.setNextIncomingId(UnsignedInteger.ONE);
flow.setNextOutgoingId(UnsignedInteger.ZERO);
flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
flow.setLinkCredit(UnsignedInteger.valueOf(10));
transport.handleFrame(new TransportFrame(0, flow, null));
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
// Cause the Close frame to be sent
connection.close();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Close);
// Send a new message and verify the transport doesn't
// send any Transfer frame, as a Close frame was sent already.
sendMessage(sender, "tag1", "content1");
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
}
private void assertNoEvents(Collector collector)
{
assertEvents(collector);
}
private void assertEvents(Collector collector, Event.Type... expectedEventTypes)
{
if(expectedEventTypes.length == 0)
{
assertNull("Expected no events, but at least one was present: " + collector.peek(), collector.peek());
}
else
{
ArrayList<Event.Type> eventTypesList = new ArrayList<Event.Type>();
Event event = null;
while ((event = collector.peek()) != null) {
eventTypesList.add(event.getType());
collector.pop();
}
assertArrayEquals("Unexpected event types: " + eventTypesList, expectedEventTypes, eventTypesList.toArray(new Event.Type[0]));
}
}
private void pumpMockTransport(MockTransportImpl transport)
{
while(transport.pending() > 0)
{
transport.pop(transport.head().remaining());
}
}
private String getFrameTypesWritten(MockTransportImpl transport)
{
String result = "";
for(FrameBody f : transport.writes) {
result += f.getClass().getSimpleName();
result += ",";
}
if(result.isEmpty()) {
return "no-frames-written";
} else {
return result;
}
}
private Delivery sendMessage(Sender sender, String deliveryTag, String messageContent)
{
byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
Message m = Message.Factory.create();
m.setBody(new AmqpValue(messageContent));
byte[] encoded = new byte[BUFFER_SIZE];
int len = m.encode(encoded, 0, BUFFER_SIZE);
assertTrue("given array was too small", len < BUFFER_SIZE);
Delivery delivery = sender.delivery(tag);
int sent = sender.send(encoded, 0, len);
assertEquals("sender unable to send all data at once as assumed for simplicity", len, sent);
boolean senderAdvanced = sender.advance();
assertTrue("sender has not advanced", senderAdvanced);
return delivery;
}
private void handleTransfer(TransportImpl transport, int deliveryNumber, String deliveryTag, String messageContent)
{
byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
Message m = Message.Factory.create();
m.setBody(new AmqpValue(messageContent));
byte[] encoded = new byte[BUFFER_SIZE];
int len = m.encode(encoded, 0, BUFFER_SIZE);
assertTrue("given array was too small", len < BUFFER_SIZE);
Transfer transfer = new Transfer();
transfer.setDeliveryId(UnsignedInteger.valueOf(deliveryNumber));
transfer.setHandle(UnsignedInteger.ZERO);
transfer.setDeliveryTag(new Binary(tag));
transfer.setMessageFormat(UnsignedInteger.valueOf(DeliveryImpl.DEFAULT_MESSAGE_FORMAT));
transport.handleFrame(new TransportFrame(0, transfer, new Binary(encoded, 0, len)));
}
private Delivery verifyDelivery(Receiver receiver, String deliveryTag, String messageContent)
{
Delivery delivery = receiver.current();
assertTrue(Arrays.equals(deliveryTag.getBytes(StandardCharsets.UTF_8), delivery.getTag()));
assertNull(delivery.getLocalState());
assertNull(delivery.getRemoteState());
assertFalse(delivery.isPartial());
assertTrue(delivery.isReadable());
byte[] received = new byte[BUFFER_SIZE];
int len = receiver.recv(received, 0, BUFFER_SIZE);
assertTrue("given array was too small", len < BUFFER_SIZE);
Message m = Proton.message();
m.decode(received, 0, len);
Object messageBody = ((AmqpValue)m.getBody()).getValue();
assertEquals("Unexpected message content", messageContent, messageBody);
boolean receiverAdvanced = receiver.advance();
assertTrue("receiver has not advanced", receiverAdvanced);
return delivery;
}
private Delivery verifyDeliveryRawPayload(Receiver receiver, String deliveryTag, byte[] payload)
{
Delivery delivery = receiver.current();
assertTrue(Arrays.equals(deliveryTag.getBytes(StandardCharsets.UTF_8), delivery.getTag()));
assertFalse(delivery.isPartial());
assertTrue(delivery.isReadable());
byte[] received = new byte[delivery.pending()];
int len = receiver.recv(received, 0, BUFFER_SIZE);
assertEquals("unexpected length", len, received.length);
assertArrayEquals("Received delivery payload not as expected", payload, received);
boolean receiverAdvanced = receiver.advance();
assertTrue("receiver has not advanced", receiverAdvanced);
return delivery;
}
/**
* Verify that the {@link TransportInternal#addTransportLayer(TransportLayer)} has the desired
* effect by observing the wrapping effect on related transport input and output methods.
*/
@Test
public void testAddAdditionalTransportLayer()
{
Integer capacityOverride = 1957;
Integer pendingOverride = 2846;
MockTransportImpl transport = new MockTransportImpl();
TransportWrapper mockWrapper = Mockito.mock(TransportWrapper.class);
Mockito.when(mockWrapper.capacity()).thenReturn(capacityOverride);
Mockito.when(mockWrapper.pending()).thenReturn(pendingOverride);
TransportLayer mockLayer = Mockito.mock(TransportLayer.class);
Mockito.when(mockLayer.wrap(Mockito.any(TransportInput.class), Mockito.any(TransportOutput.class))).thenReturn(mockWrapper);
transport.addTransportLayer(mockLayer);
assertEquals("Unexepcted value, layer override not effective", capacityOverride.intValue(), transport.capacity());
assertEquals("Unexepcted value, layer override not effective", pendingOverride.intValue(), transport.pending());
}
@Test
public void testAddAdditionalTransportLayerThrowsISEIfProcessingStarted()
{
MockTransportImpl transport = new MockTransportImpl();
TransportLayer mockLayer = Mockito.mock(TransportLayer.class);
transport.process();
try
{
transport.addTransportLayer(mockLayer);
fail("Expected exception to be thrown due to processing having started");
}
catch (IllegalStateException ise)
{
// expected
}
}
@Test
public void testEndpointOpenAndCloseAreIdempotent()
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
Collector collector = Collector.Factory.create();
connection.collect(collector);
connection.open();
connection.open();
Session session = connection.session();
session.open();
String linkName = "mySender";
Sender sender = session.sender(linkName);
sender.open();
pumpMockTransport(transport);
assertEvents(collector, Event.Type.CONNECTION_INIT, Event.Type.CONNECTION_LOCAL_OPEN, Event.Type.TRANSPORT,
Event.Type.SESSION_INIT, Event.Type.SESSION_LOCAL_OPEN,
Event.Type.TRANSPORT, Event.Type.LINK_INIT, Event.Type.LINK_LOCAL_OPEN, Event.Type.TRANSPORT);
pumpMockTransport(transport);
connection.open();
session.open();
sender.open();
assertNoEvents(collector);
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
// Send the necessary responses to open/begin/attach then give sender credit
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach = new Attach();
attach.setHandle(UnsignedInteger.ZERO);
attach.setRole(Role.RECEIVER);
attach.setName(linkName);
attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach, null));
assertEvents(collector, Event.Type.CONNECTION_REMOTE_OPEN, Event.Type.SESSION_REMOTE_OPEN,
Event.Type.LINK_REMOTE_OPEN);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
// Now close the link and expect one event
sender.close();
sender.close();
assertEvents(collector, Event.Type.LINK_LOCAL_CLOSE, Event.Type.TRANSPORT);
pumpMockTransport(transport);
sender.close();
assertNoEvents(collector);
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Detach);
}
@Test
public void testInitialRemoteMaxFrameSizeOverride()
{
MockTransportImpl transport = new MockTransportImpl();
transport.setInitialRemoteMaxFrameSize(768);
assertEquals("Unexpected value : " + getFrameTypesWritten(transport), 768, transport.getRemoteMaxFrameSize());
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 1, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
try
{
transport.setInitialRemoteMaxFrameSize(12345);
fail("expected an exception");
}
catch (IllegalStateException ise )
{
//expected
}
// Send the necessary responses to open
Open open = new Open();
open.setMaxFrameSize(UnsignedInteger.valueOf(4567));
transport.handleFrame(new TransportFrame(0, open, null));
assertEquals("Unexpected value : " + getFrameTypesWritten(transport), 4567, transport.getRemoteMaxFrameSize());
}
@Test
public void testTickWithZeroIdleTimeoutsGivesZeroDeadline()
{
doTickWithNoIdleTimeoutGivesZeroDeadlineTestImpl(true);
}
@Test
public void testTickWithNullIdleTimeoutsGivesZeroDeadline()
{
doTickWithNoIdleTimeoutGivesZeroDeadlineTestImpl(false);
}
private void doTickWithNoIdleTimeoutGivesZeroDeadlineTestImpl(boolean useZero) {
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
while(transport.pending() > 0) {
transport.pop(transport.head().remaining());
}
assertEquals("should have written data", 1, transport.writes.size());
FrameBody sentOpenFrame = transport.writes.get(0);
assertNotNull("should have written a non-empty frame", sentOpenFrame);
assertTrue("should have written an open frame", sentOpenFrame instanceof Open);
assertNull("should not have had an idletimeout value", ((Open)sentOpenFrame).getIdleTimeOut());
// Handle the peer transmitting their open with null/zero timeout.
Open open = new Open();
if(useZero) {
open.setIdleTimeOut(UnsignedInteger.ZERO);
} else {
open.setIdleTimeOut(null);
}
TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
transport.handleFrame(openFrame);
pumpMockTransport(transport);
long deadline = transport.tick(0);
assertEquals("Unexpected deadline returned", 0, deadline);
deadline = transport.tick(10);
assertEquals("Unexpected deadline returned", 0, deadline);
}
@Test
public void testTickWithLocalTimeout()
{
// all-positive
doTickWithLocalTimeoutTestImpl(4000, 10000, 14000, 18000, 22000);
// all-negative
doTickWithLocalTimeoutTestImpl(2000, -100000, -98000, -96000, -94000);
// negative to positive missing 0
doTickWithLocalTimeoutTestImpl(500, -950, -450, 50, 550);
// negative to positive striking 0
doTickWithLocalTimeoutTestImpl(3000, -6000, -3000, 1, 3001);
}
private void doTickWithLocalTimeoutTestImpl(int localTimeout, long tick1, long expectedDeadline1, long expectedDeadline2, long expectedDeadline3)
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
// Set our local idleTimeout
transport.setIdleTimeout(localTimeout);
connection.open();
pumpMockTransport(transport);
assertEquals("should have written data", 1, transport.writes.size());
Object sentOpenFrame = transport.writes.get(0);
assertNotNull("should have written a non-empty frame", sentOpenFrame);
assertTrue("should have written an open frame", sentOpenFrame instanceof Open);
assertEquals("should have had an idletimeout value half our actual timeout", UnsignedInteger.valueOf(localTimeout / 2), ((Open)sentOpenFrame).getIdleTimeOut());
// Receive Protocol header
processInput(transport, ByteBuffer.wrap(new byte[] {'A', 'M', 'Q', 'P', 0x00, 0x01, 0x00, 0x00}));
// Handle the peer transmitting their open, without timeout.
Open open = new Open();
open.setIdleTimeOut(null);
TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
transport.handleFrame(openFrame);
pumpMockTransport(transport);
long deadline = transport.tick(tick1);
assertEquals("Unexpected deadline returned", expectedDeadline1, deadline);
// Wait for less time than the deadline with no data - get the same value
long interimTick = tick1 + 10;
assertTrue (interimTick < expectedDeadline1);
assertEquals("When the deadline hasn't been reached tick() should return the previous deadline", expectedDeadline1, transport.tick(interimTick));
assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
// Receive Empty frame to satisfy local deadline
processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
deadline = transport.tick(expectedDeadline1);
assertEquals("When the deadline has been reached expected a new local deadline to be returned", expectedDeadline2, deadline);
assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
pumpMockTransport(transport);
// Receive Empty frame to satisfy local deadline
processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
deadline = transport.tick(expectedDeadline2);
assertEquals("When the deadline has been reached expected a new local deadline to be returned", expectedDeadline3, deadline);
assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
pumpMockTransport(transport);
assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
transport.tick(expectedDeadline3); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
assertEquals("tick() should have written data", 2, transport.writes.size());
assertNotNull("should have written a non-empty frame", transport.writes.get(1));
assertTrue("should have written a close frame", transport.writes.get(1) instanceof Close);
}
@Test
public void testTickWithRemoteTimeout()
{
// all-positive
doTickWithRemoteTimeoutTestImpl(4000, 10000, 14000, 18000, 22000);
// all-negative
doTickWithRemoteTimeoutTestImpl(2000, -100000, -98000, -96000, -94000);
// negative to positive missing 0
doTickWithRemoteTimeoutTestImpl(500, -950, -450, 50, 550);
// negative to positive striking 0
doTickWithRemoteTimeoutTestImpl(3000, -6000, -3000, 1, 3001);
}
private void doTickWithRemoteTimeoutTestImpl(int remoteTimeoutHalf, long tick1, long expectedDeadline1, long expectedDeadline2, long expectedDeadline3)
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
pumpMockTransport(transport);
assertEquals("should have written data", 1, transport.writes.size());
Object sentOpenFrame = transport.writes.get(0);
assertNotNull("should have written a non-empty frame", sentOpenFrame);
assertTrue("should have written an open frame", sentOpenFrame instanceof Open);
assertNull("should not have had an idletimeout value", ((Open)sentOpenFrame).getIdleTimeOut());
// Receive Protocol header
processInput(transport, ByteBuffer.wrap(new byte[] {'A', 'M', 'Q', 'P', 0x00, 0x01, 0x00, 0x00}));
// Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
// if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
Open open = new Open();
open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
transport.handleFrame(openFrame);
pumpMockTransport(transport);
long deadline = transport.tick(tick1);
assertEquals("Unexpected deadline returned", expectedDeadline1, deadline);
// Wait for less time than the deadline with no data - get the same value
long interimTick = tick1 + 10;
assertTrue (interimTick < expectedDeadline1);
assertEquals("When the deadline hasn't been reached tick() should return the previous deadline", expectedDeadline1, transport.tick(interimTick));
assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
deadline = transport.tick(expectedDeadline1);
assertEquals("When the deadline has been reached expected a new remote deadline to be returned", expectedDeadline2, deadline);
assertEquals("tick() should have written data", 2, transport.writes.size());
assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
pumpMockTransport(transport);
// Do some actual work, create real traffic, removing the need to send empty frame to satisfy idle-timeout
connection.session().open();
pumpMockTransport(transport);
assertEquals("session open should have written data", 3, transport.writes.size());
Object sentBeginFrame = transport.writes.get(2);
assertNotNull("should have written a non-empty frame", sentBeginFrame);
assertTrue("session open should have written a Begin frame", sentBeginFrame instanceof Begin);
deadline = transport.tick(expectedDeadline2);
assertEquals("When the deadline has been reached expected a new remote deadline to be returned", expectedDeadline3, deadline);
assertEquals("tick() should not have written data as there was actual activity", 3, transport.writes.size());
pumpMockTransport(transport);
transport.tick(expectedDeadline3);
assertEquals("tick() should have written data", 4, transport.writes.size());
assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
}
@Test
public void testTickWithBothTimeouts()
{
// all-positive
doTickWithBothTimeoutsTestImpl(true, 5000, 2000, 10000, 12000, 14000, 15000);
doTickWithBothTimeoutsTestImpl(false, 5000, 2000, 10000, 12000, 14000, 15000);
// all-negative
doTickWithBothTimeoutsTestImpl(true, 10000, 4000, -100000, -96000, -92000, -90000);
doTickWithBothTimeoutsTestImpl(false, 10000, 4000, -100000, -96000, -92000, -90000);
// negative to positive missing 0
doTickWithBothTimeoutsTestImpl(true, 500, 200, -450, -250, -50, 50);
doTickWithBothTimeoutsTestImpl(false, 500, 200, -450, -250, -50, 50);
// negative to positive striking 0 with local deadline
doTickWithBothTimeoutsTestImpl(true, 500, 200, -500, -300, -100, 1);
doTickWithBothTimeoutsTestImpl(false, 500, 200, -500, -300, -100, 1);
// negative to positive striking 0 with remote deadline
doTickWithBothTimeoutsTestImpl(true, 500, 200, -200, 1, 201, 300);
doTickWithBothTimeoutsTestImpl(false, 500, 200, -200, 1, 201, 300);
}
private void doTickWithBothTimeoutsTestImpl(boolean allowLocalTimeout, int localTimeout, int remoteTimeoutHalf, long tick1,
long expectedDeadline1, long expectedDeadline2, long expectedDeadline3)
{
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
// Set our local idleTimeout
transport.setIdleTimeout(localTimeout);
connection.open();
pumpMockTransport(transport);
assertEquals("should have written data", 1, transport.writes.size());
assertNotNull("should have written a non-empty frame", transport.writes.get(0));
assertTrue("should have written an open frame", transport.writes.get(0) instanceof Open);
// Receive Protocol header
processInput(transport, ByteBuffer.wrap(new byte[] {'A', 'M', 'Q', 'P', 0x00, 0x01, 0x00, 0x00}));
// Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
// if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
Open open = new Open();
open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
transport.handleFrame(openFrame);
pumpMockTransport(transport);
long deadline = transport.tick(tick1);
assertEquals("Unexpected deadline returned", expectedDeadline1, deadline);
// Wait for less time than the deadline with no data - get the same value
long interimTick = tick1 + 10;
assertTrue (interimTick < expectedDeadline1);
assertEquals("When the deadline hasn't been reached tick() should return the previous deadline", expectedDeadline1, transport.tick(interimTick));
assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
deadline = transport.tick(expectedDeadline1);
assertEquals("When the deadline has been reached expected a new remote deadline to be returned", expectedDeadline2, deadline);
assertEquals("tick() should have written data", 2, transport.writes.size());
assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
pumpMockTransport(transport);
deadline = transport.tick(expectedDeadline2);
assertEquals("When the deadline has been reached expected a new local deadline to be returned", expectedDeadline3, deadline);
assertEquals("tick() should have written data", 3, transport.writes.size());
assertEquals("tick() should have written an empty frame", null, transport.writes.get(2));
pumpMockTransport(transport);
if(allowLocalTimeout) {
assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
transport.tick(expectedDeadline3); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
assertEquals("tick() should have written data", 4, transport.writes.size());
assertNotNull("should have written a non-empty frame", transport.writes.get(3));
assertTrue("should have written a close frame", transport.writes.get(3) instanceof Close);
} else {
// Receive Empty frame to satisfy local deadline
processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
deadline = transport.tick(expectedDeadline3);
assertEquals("Receiving data should have reset the deadline (to the next remote one)", expectedDeadline2 + (remoteTimeoutHalf), deadline);
assertEquals("tick() shouldn't have written data", 3, transport.writes.size());
assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
}
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemote()
{
doTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteTestImpl(false);
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteWithLocalTimeout()
{
doTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteTestImpl(true);
}
private void doTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteTestImpl(boolean allowLocalTimeout) {
int localTimeout = 5000;
int remoteTimeoutHalf = 2000;
assertTrue(remoteTimeoutHalf < localTimeout);
long offset = 2500;
assertTrue(offset < localTimeout);
assertTrue(offset > remoteTimeoutHalf);
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
// Set our local idleTimeout
transport.setIdleTimeout(localTimeout);
connection.open();
pumpMockTransport(transport);
assertEquals("should have written data", 1, transport.writes.size());
assertNotNull("should have written a non-empty frame", transport.writes.get(0));
assertTrue("should have written an open frame", transport.writes.get(0) instanceof Open);
// Receive Protocol header
processInput(transport, ByteBuffer.wrap(new byte[] {'A', 'M', 'Q', 'P', 0x00, 0x01, 0x00, 0x00}));
// Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
// if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
Open open = new Open();
open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
transport.handleFrame(openFrame);
pumpMockTransport(transport);
long deadline = transport.tick(Long.MAX_VALUE - offset);
assertEquals("Unexpected deadline returned", Long.MAX_VALUE - offset + remoteTimeoutHalf, deadline);
deadline = transport.tick(Long.MAX_VALUE - (offset - 100)); // Wait for less time than the deadline with no data - get the same value
assertEquals("When the deadline hasn't been reached tick() should return the previous deadline", Long.MAX_VALUE -offset + remoteTimeoutHalf, deadline);
assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
deadline = transport.tick(Long.MAX_VALUE -offset + remoteTimeoutHalf); // Wait for the deadline - next deadline should be previous + remoteTimeoutHalf;
assertEquals("When the deadline has been reached expected a new remote deadline to be returned", Long.MIN_VALUE + (2* remoteTimeoutHalf) - offset -1, deadline);
assertEquals("tick() should have written data", 2, transport.writes.size());
assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
pumpMockTransport(transport);
deadline = transport.tick(Long.MIN_VALUE + (2* remoteTimeoutHalf) - offset -1); // Wait for the deadline - next deadline should be orig + localTimeout;
assertEquals("When the deadline has been reached expected a new local deadline to be returned", Long.MIN_VALUE + (localTimeout - offset) -1, deadline);
assertEquals("tick() should have written data", 3, transport.writes.size());
assertEquals("tick() should have written an empty frame", null, transport.writes.get(2));
pumpMockTransport(transport);
if(allowLocalTimeout) {
assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
assertEquals("tick() should have written data", 4, transport.writes.size());
assertNotNull("should have written a non-empty frame", transport.writes.get(3));
assertTrue("should have written a close frame", transport.writes.get(3) instanceof Close);
} else {
// Receive Empty frame to satisfy local deadline
processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
deadline = transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline - next deadline should be orig + 3*remoteTimeoutHalf;
assertEquals("Receiving data should have reset the deadline (to the remote one)", Long.MIN_VALUE + (3* remoteTimeoutHalf) - offset -1, deadline);
assertEquals("tick() shouldn't have written data", 3, transport.writes.size());
assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
}
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocal()
{
doTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalTestImpl(false);
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalWithLocalTimeout()
{
doTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalTestImpl(true);
}
private void doTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalTestImpl(boolean allowLocalTimeout) {
int localTimeout = 2000;
int remoteTimeoutHalf = 5000;
assertTrue(localTimeout < remoteTimeoutHalf);
long offset = 2500;
assertTrue(offset > localTimeout);
assertTrue(offset < remoteTimeoutHalf);
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
// Set our local idleTimeout
transport.setIdleTimeout(localTimeout);
connection.open();
pumpMockTransport(transport);
assertEquals("should have written data", 1, transport.writes.size());
assertNotNull("should have written a non-empty frame", transport.writes.get(0));
assertTrue("should have written an open frame", transport.writes.get(0) instanceof Open);
// Receive Protocol header
processInput(transport, ByteBuffer.wrap(new byte[] {'A', 'M', 'Q', 'P', 0x00, 0x01, 0x00, 0x00}));
// Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
// if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
Open open = new Open();
open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
transport.handleFrame(openFrame);
pumpMockTransport(transport);
long deadline = transport.tick(Long.MAX_VALUE - offset);
assertEquals("Unexpected deadline returned", Long.MAX_VALUE - offset + localTimeout, deadline);
deadline = transport.tick(Long.MAX_VALUE - (offset - 100)); // Wait for less time than the deadline with no data - get the same value
assertEquals("When the deadline hasn't been reached tick() should return the previous deadline", Long.MAX_VALUE - offset + localTimeout, deadline);
assertEquals("tick() shouldn't have written data", 1, transport.writes.size());
// Receive Empty frame to satisfy local deadline
processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
deadline = transport.tick(Long.MAX_VALUE - offset + localTimeout); // Wait for the deadline - next deadline should be orig + 2* localTimeout;
assertEquals("When the deadline has been reached expected a new local deadline to be returned", Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout, deadline);
assertEquals("tick() should not have written data", 1, transport.writes.size());
pumpMockTransport(transport);
if(allowLocalTimeout) {
assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
assertEquals("tick() should have written data", 2, transport.writes.size());
assertNotNull("should have written a non-empty frame", transport.writes.get(1));
assertTrue("should have written a close frame", transport.writes.get(1) instanceof Close);
} else {
// Receive Empty frame to satisfy local deadline
processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
deadline = transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline - next deadline should be orig + remoteTimeoutHalf;
assertEquals("Receiving data should have reset the deadline (to the remote one)", Long.MIN_VALUE + remoteTimeoutHalf - offset -1, deadline);
assertEquals("tick() shouldn't have written data", 1, transport.writes.size());
deadline = transport.tick(Long.MIN_VALUE + remoteTimeoutHalf - offset -1); // Wait for the deadline - next deadline should be orig + 3* localTimeout;
assertEquals("When the deadline has been reached expected a new local deadline to be returned", Long.MIN_VALUE + (3* localTimeout) - offset -1, deadline);
assertEquals("tick() should have written data", 2, transport.writes.size());
assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
}
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirst()
{
doTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstTestImpl(false);
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstWithLocalTimeout()
{
doTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstTestImpl(true);
}
private void doTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstTestImpl(boolean allowLocalTimeout) {
int localTimeout = 2000;
int remoteTimeoutHalf = 2500;
assertTrue(localTimeout < remoteTimeoutHalf);
long offset = 500;
assertTrue(offset < localTimeout);
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
// Set our local idleTimeout
transport.setIdleTimeout(localTimeout);
connection.open();
pumpMockTransport(transport);
assertEquals("should have written data", 1, transport.writes.size());
assertNotNull("should have written a non-empty frame", transport.writes.get(0));
assertTrue("should have written an open frame", transport.writes.get(0) instanceof Open);
// Receive Protocol header
processInput(transport, ByteBuffer.wrap(new byte[] {'A', 'M', 'Q', 'P', 0x00, 0x01, 0x00, 0x00}));
// Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
// if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
Open open = new Open();
open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
transport.handleFrame(openFrame);
pumpMockTransport(transport);
long deadline = transport.tick(Long.MAX_VALUE - offset);
assertEquals("Unexpected deadline returned", Long.MIN_VALUE + (localTimeout - offset) -1, deadline);
deadline = transport.tick(Long.MAX_VALUE - (offset - 100)); // Wait for less time than the deadline with no data - get the same value
assertEquals("When the deadline hasn't been reached tick() should return the previous deadline", Long.MIN_VALUE + (localTimeout - offset) -1, deadline);
assertEquals("tick() shouldn't have written data", 1, transport.writes.size());
// Receive Empty frame to satisfy local deadline
processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
deadline = transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline - next deadline should be orig + remoteTimeoutHalf;
assertEquals("When the deadline has been reached expected a new remote deadline to be returned", Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1, deadline);
assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
deadline = transport.tick(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1); // Wait for the deadline - next deadline should be orig + 2* localTimeout;
assertEquals("When the deadline has been reached expected a new local deadline to be returned", Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout, deadline);
assertEquals("tick() should have written data", 2, transport.writes.size());
assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
pumpMockTransport(transport);
if(allowLocalTimeout) {
assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
assertEquals("tick() should have written data", 3, transport.writes.size());
assertNotNull("should have written a non-empty frame", transport.writes.get(2));
assertTrue("should have written a close frame", transport.writes.get(2) instanceof Close);
} else {
// Receive Empty frame to satisfy local deadline
processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
deadline = transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline - next deadline should be orig + 2*remoteTimeoutHalf;
assertEquals("Receiving data should have reset the deadline (to the remote one)", Long.MIN_VALUE + (2* remoteTimeoutHalf) - offset -1, deadline);
assertEquals("tick() shouldn't have written data", 2, transport.writes.size());
assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
}
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirst()
{
doTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstTestImpl(false);
}
@Test
public void testTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstWithLocalTimeout()
{
doTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstTestImpl(true);
}
private void doTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstTestImpl(boolean allowLocalTimeout) {
int localTimeout = 5000;
int remoteTimeoutHalf = 2000;
assertTrue(remoteTimeoutHalf < localTimeout);
long offset = 500;
assertTrue(offset < remoteTimeoutHalf);
MockTransportImpl transport = new MockTransportImpl();
Connection connection = Proton.connection();
transport.bind(connection);
// Set our local idleTimeout
transport.setIdleTimeout(localTimeout);
connection.open();
while(transport.pending() > 0) {
transport.pop(transport.head().remaining());
}
assertEquals("should have written data", 1, transport.writes.size());
assertNotNull("should have written a non-empty frame", transport.writes.get(0));
assertTrue("should have written an open frame", transport.writes.get(0) instanceof Open);
// Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
// if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
Open open = new Open();
open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
transport.handleFrame(openFrame);
pumpMockTransport(transport);
long deadline = transport.tick(Long.MAX_VALUE - offset);
assertEquals("Unexpected deadline returned", Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1, deadline);
deadline = transport.tick(Long.MAX_VALUE - (offset - 100)); // Wait for less time than the deadline with no data - get the same value
assertEquals("When the deadline hasn't been reached tick() should return the previous deadline", Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1, deadline);
assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
deadline = transport.tick(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1); // Wait for the deadline - next deadline should be previous + remoteTimeoutHalf;
assertEquals("When the deadline has been reached expected a new remote deadline to be returned", Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1 + remoteTimeoutHalf, deadline);
assertEquals("tick() should have written data", 2, transport.writes.size());
assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
pumpMockTransport(transport);
deadline = transport.tick(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1 + remoteTimeoutHalf); // Wait for the deadline - next deadline should be orig + localTimeout;
assertEquals("When the deadline has been reached expected a new local deadline to be returned", Long.MIN_VALUE + (localTimeout - offset) -1, deadline);
assertEquals("tick() should have written data", 3, transport.writes.size());
assertEquals("tick() should have written an empty frame", null, transport.writes.get(2));
pumpMockTransport(transport);
if(allowLocalTimeout) {
assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
assertEquals("tick() should have written data", 4, transport.writes.size());
assertNotNull("should have written a non-empty frame", transport.writes.get(3));
assertTrue("should have written a close frame", transport.writes.get(3) instanceof Close);
} else {
// Receive Empty frame to satisfy local deadline
processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
deadline = transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline - next deadline should be orig + 3*remoteTimeoutHalf;
assertEquals("Receiving data should have reset the deadline (to the remote one)", Long.MIN_VALUE + (3* remoteTimeoutHalf) - offset -1, deadline);
assertEquals("tick() shouldn't have written data", 3, transport.writes.size());
assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
}
}
@Test
public void testMaxFrameSizeOfPeerHasEffect()
{
doMaxFrameSizeTestImpl(0, 0, 5700, 1);
doMaxFrameSizeTestImpl(1024, 0, 5700, 6);
}
@Test
public void testMaxFrameSizeOutgoingFrameSizeLimitHasEffect()
{
doMaxFrameSizeTestImpl(0, 512, 5700, 12);
doMaxFrameSizeTestImpl(1024, 512, 5700, 12);
doMaxFrameSizeTestImpl(1024, 2048, 5700, 6);
}
void doMaxFrameSizeTestImpl(int remoteMaxFrameSize, int outboundFrameSizeLimit, int contentLength, int expectedNumFrames)
{
MockTransportImpl transport = new MockTransportImpl();
transport.setEmitFlowEventOnSend(false);
// If we have been given an outboundFrameSizeLimit, configure it
if(outboundFrameSizeLimit != 0) {
transport.setOutboundFrameSizeLimit(outboundFrameSizeLimit);
}
Connection connection = Proton.connection();
transport.bind(connection);
Session session = connection.session();
session.open();
String linkName = "mySender";
Sender sender = session.sender(linkName);
sender.open();
String messageContent = createLargeContent(contentLength);
sendMessage(sender, "tag1", messageContent);
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size());
// Now open the connection, expect the Open and Begin frames but
// nothing else as we haven't opened the receiver itself yet.
connection.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
// Send the necessary responses to open/begin/attach then give sender credit
Open open = new Open();
if(remoteMaxFrameSize != 0) {
open.setMaxFrameSize(UnsignedInteger.valueOf(remoteMaxFrameSize));
}
transport.handleFrame(new TransportFrame(0, open, null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach = new Attach();
attach.setHandle(UnsignedInteger.ZERO);
attach.setRole(Role.RECEIVER);
attach.setName(linkName);
attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach, null));
Flow flow = new Flow();
flow.setHandle(UnsignedInteger.ZERO);
flow.setDeliveryCount(UnsignedInteger.ZERO);
flow.setNextIncomingId(UnsignedInteger.ONE);
flow.setNextOutgoingId(UnsignedInteger.ZERO);
flow.setIncomingWindow(UnsignedInteger.valueOf(1024));
flow.setOutgoingWindow(UnsignedInteger.valueOf(1024));
flow.setLinkCredit(UnsignedInteger.valueOf(10));
transport.handleFrame(new TransportFrame(0, flow, null));
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 3, transport.writes.size());
// Now pump the transport again and expect transfers for the message
pumpMockTransport(transport);
// This calc isn't entirely precise, there is some added performative/frame overhead not
// accounted for...but values are chosen to work, and verified here.
final int frameCount;
if(remoteMaxFrameSize == 0 && outboundFrameSizeLimit == 0) {
frameCount = 1;
} else if(remoteMaxFrameSize == 0 && outboundFrameSizeLimit != 0) {
frameCount = (int) Math.ceil((double)contentLength / (double) outboundFrameSizeLimit);
} else {
int effectiveMaxFrameSize;
if(outboundFrameSizeLimit != 0) {
effectiveMaxFrameSize = Math.min(outboundFrameSizeLimit, remoteMaxFrameSize);
} else {
effectiveMaxFrameSize = remoteMaxFrameSize;
}
frameCount = (int) Math.ceil((double)contentLength / (double) effectiveMaxFrameSize);
}
assertEquals("Unexpected number of frames calculated", expectedNumFrames, frameCount);
final int start = 3;
final int totalExpected = start + frameCount;
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), totalExpected, transport.writes.size());
for(int i = start; i < totalExpected; i++) {
assertTrue("Unexpected frame type", transport.writes.get(i) instanceof Transfer);
}
}
private void processInput(MockTransportImpl transport, ByteBuffer data) {
while (data.remaining() > 0)
{
int origLimit = data.limit();
int amount = Math.min(transport.tail().remaining(), data.remaining());
data.limit(data.position() + amount);
transport.tail().put(data);
data.limit(origLimit);
transport.process();
}
}
private static String createLargeContent(int length) {
Random rand = new Random(System.currentTimeMillis());
byte[] payload = new byte[length];
for (int i = 0; i < length; i++) {
payload[i] = (byte) (64 + 1 + rand.nextInt(9));
}
return new String(payload, StandardCharsets.UTF_8);
}
@Test
public void testMultiplexMultiFrameDeliveryOnSingleSessionOutgoing() {
doMultiplexMultiFrameDeliveryOnSingleSessionOutgoingTestImpl(false);
}
@Test
public void testMultiplexMultiFrameDeliveriesOnSingleSessionOutgoing() {
doMultiplexMultiFrameDeliveryOnSingleSessionOutgoingTestImpl(true);
}
private void doMultiplexMultiFrameDeliveryOnSingleSessionOutgoingTestImpl(boolean bothDeliveriesMultiFrame) {
MockTransportImpl transport = new MockTransportImpl();
transport.setEmitFlowEventOnSend(false);
int contentLength1 = 6000;
int frameSizeLimit = 4000;
int contentLength2 = 2000;
if(bothDeliveriesMultiFrame) {
contentLength2 = 6000;
}
Connection connection = Proton.connection();
transport.bind(connection);
Session session = connection.session();
session.open();
String linkName = "mySender1";
Sender sender = session.sender(linkName);
sender.open();
String linkName2 = "mySender2";
Sender sender2 = session.sender(linkName2);
sender2.open();
String messageContent1 = createLargeContent(contentLength1);
sendMessage(sender, "tag1", messageContent1);
String messageContent2 = createLargeContent(contentLength2);
sendMessage(sender2, "tag2", messageContent2);
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size());
// Now open the connection, expect the Open Begin, and Attach frames but
// nothing else as we haven't remotely opened the receiver or given credit yet.
connection.open();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Attach);
// Send the necessary responses to open/begin/attach then give senders credit
Open open = new Open();
open.setMaxFrameSize(UnsignedInteger.valueOf(frameSizeLimit));
transport.handleFrame(new TransportFrame(0, open, null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach1 = new Attach();
attach1.setHandle(UnsignedInteger.ZERO);
attach1.setRole(Role.RECEIVER);
attach1.setName(linkName);
attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach1, null));
Attach attach2 = new Attach();
attach2.setHandle(UnsignedInteger.ONE);
attach2.setRole(Role.RECEIVER);
attach2.setName(linkName2);
attach2.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach2, null));
Flow flow1 = new Flow();
flow1.setHandle(UnsignedInteger.ZERO);
flow1.setDeliveryCount(UnsignedInteger.ZERO);
flow1.setNextIncomingId(UnsignedInteger.ONE);
flow1.setNextOutgoingId(UnsignedInteger.ZERO);
flow1.setIncomingWindow(UnsignedInteger.valueOf(1024));
flow1.setOutgoingWindow(UnsignedInteger.valueOf(1024));
flow1.setLinkCredit(UnsignedInteger.valueOf(10));
transport.handleFrame(new TransportFrame(0, flow1, null));
Flow flow2 = new Flow();
flow2.setHandle(UnsignedInteger.ONE);
flow2.setDeliveryCount(UnsignedInteger.ZERO);
flow2.setNextIncomingId(UnsignedInteger.ONE);
flow2.setNextOutgoingId(UnsignedInteger.ZERO);
flow2.setIncomingWindow(UnsignedInteger.valueOf(1024));
flow2.setOutgoingWindow(UnsignedInteger.valueOf(1024));
flow2.setLinkCredit(UnsignedInteger.valueOf(10));
transport.handleFrame(new TransportFrame(0, flow2, null));
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
// Now pump the transport again and expect transfers for the messages
pumpMockTransport(transport);
int expectedFrames = bothDeliveriesMultiFrame ? 8 : 7;
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), expectedFrames, transport.writes.size());
FrameBody frameBody = transport.writes.get(4);
assertTrue("Unexpected frame type", frameBody instanceof Transfer);
Transfer transfer = (Transfer) frameBody;
assertEquals("Unexpected delivery tag", new Binary("tag1".getBytes(StandardCharsets.UTF_8)), transfer.getDeliveryTag());
assertEquals("Unexpected deliveryId", UnsignedInteger.ZERO, transfer.getDeliveryId());
assertEquals("Unexpected more flag", true, transfer.getMore());
frameBody = transport.writes.get(5);
assertTrue("Unexpected frame type", frameBody instanceof Transfer);
transfer = (Transfer) frameBody;
assertEquals("Unexpected delivery tag", new Binary("tag2".getBytes(StandardCharsets.UTF_8)), transfer.getDeliveryTag());
assertEquals("Unexpected deliveryId", UnsignedInteger.ONE, transfer.getDeliveryId());
assertEquals("Unexpected more flag", bothDeliveriesMultiFrame, transfer.getMore());
frameBody = transport.writes.get(6);
assertTrue("Unexpected frame type", frameBody instanceof Transfer);
transfer = (Transfer) frameBody;
assertEquals("Unexpected delivery tag", new Binary("tag1".getBytes(StandardCharsets.UTF_8)), transfer.getDeliveryTag());
assertEquals("Unexpected deliveryId", UnsignedInteger.ZERO, transfer.getDeliveryId());
assertEquals("Unexpected more flag", false, transfer.getMore());
if(bothDeliveriesMultiFrame) {
frameBody = transport.writes.get(7);
assertTrue("Unexpected frame type", frameBody instanceof Transfer);
transfer = (Transfer) frameBody;
assertEquals("Unexpected delivery tag", new Binary("tag2".getBytes(StandardCharsets.UTF_8)), transfer.getDeliveryTag());
assertEquals("Unexpected deliveryId", UnsignedInteger.ONE, transfer.getDeliveryId());
assertEquals("Unexpected more flag", false, transfer.getMore());
}
}
@Test
public void testMultiplexMultiFrameDeliveriesOnSingleSessionIncoming() {
doMultiplexMultiFrameDeliveryOnSingleSessionIncomingTestImpl(true);
}
@Test
public void testMultiplexMultiFrameDeliveryOnSingleSessionIncoming() {
doMultiplexMultiFrameDeliveryOnSingleSessionIncomingTestImpl(false);
}
private void doMultiplexMultiFrameDeliveryOnSingleSessionIncomingTestImpl(boolean bothDeliveriesMultiFrame) {
int contentLength1 = 7000;
int maxPayloadChunkSize = 2000;
int contentLength2 = 1000;
if(bothDeliveriesMultiFrame) {
contentLength2 = 4100;
}
MockTransportImpl transport = new MockTransportImpl();
transport.setEmitFlowEventOnSend(false);
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
session.open();
String linkName1 = "myReceiver1";
Receiver receiver1 = session.receiver(linkName1);
receiver1.flow(5);
receiver1.open();
String linkName2 = "myReceiver2";
Receiver receiver2 = session.receiver(linkName2);
receiver2.flow(5);
receiver2.open();
pumpMockTransport(transport);
final UnsignedInteger r1handle = UnsignedInteger.ZERO;
final UnsignedInteger r2handle = UnsignedInteger.ONE;
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
FrameBody frame = transport.writes.get(2);
assertTrue("Unexpected frame type", frame instanceof Attach);
assertEquals("Unexpected handle", ((Attach) frame).getHandle(), r1handle);
frame = transport.writes.get(3);
assertTrue("Unexpected frame type", frame instanceof Attach);
assertEquals("Unexpected handle", ((Attach) frame).getHandle(), r2handle);
frame = transport.writes.get(4);
assertTrue("Unexpected frame type", frame instanceof Flow);
assertEquals("Unexpected handle", ((Flow) frame).getHandle(), r1handle);
frame = transport.writes.get(5);
assertTrue("Unexpected frame type", frame instanceof Flow);
assertEquals("Unexpected handle", ((Flow) frame).getHandle(), r2handle);
assertNull("Should not yet have a delivery", receiver1.current());
assertNull("Should not yet have a delivery", receiver2.current());
// Send the necessary responses to open/begin/attach
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
begin.setNextOutgoingId(UnsignedInteger.ONE);
begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach1 = new Attach();
attach1.setHandle(r1handle);
attach1.setRole(Role.SENDER);
attach1.setName(linkName1);
attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach1, null));
Attach attach2 = new Attach();
attach2.setHandle(r2handle);
attach2.setRole(Role.SENDER);
attach2.setName(linkName2);
attach2.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach2, null));
String deliveryTag1 = "tag1";
String messageContent1 = createLargeContent(contentLength1);
String deliveryTag2 = "tag2";
String messageContent2 = createLargeContent(contentLength2);
ArrayList<byte[]> message1chunks = createTransferPayloads(messageContent1, maxPayloadChunkSize);
assertEquals("unexpected number of payload chunks", 4, message1chunks.size());
ArrayList<byte[]> message2chunks = createTransferPayloads(messageContent2, maxPayloadChunkSize);
if(bothDeliveriesMultiFrame) {
assertEquals("unexpected number of payload chunks", 3, message2chunks.size());
} else {
assertEquals("unexpected number of payload chunks", 1, message2chunks.size());
}
while (true) {
if (!message1chunks.isEmpty()) {
byte[] chunk = message1chunks.remove(0);
handlePartialTransfer(transport, r1handle, 1, deliveryTag1, chunk, !message1chunks.isEmpty());
}
if (!message2chunks.isEmpty()) {
byte[] chunk = message2chunks.remove(0);
handlePartialTransfer(transport, r2handle, 2, deliveryTag2, chunk, !message2chunks.isEmpty());
}
if (message1chunks.isEmpty() && message2chunks.isEmpty()) {
break;
}
}
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
assertEquals("Unexpected queued count", 1, receiver1.getQueued());
Delivery delivery1 = verifyDelivery(receiver1, deliveryTag1, messageContent1);
assertNotNull("Should now have a delivery", delivery1);
assertEquals("Unexpected queued count", 0, receiver1.getQueued());
assertEquals("Unexpected queued count", 1, receiver2.getQueued());
Delivery delivery2 = verifyDelivery(receiver2, deliveryTag2, messageContent2);
assertNotNull("Should now have a delivery", delivery2);
assertEquals("Unexpected queued count", 0, receiver2.getQueued());
delivery1.disposition(Accepted.getInstance());
delivery1.settle();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 7, transport.writes.size());
frame = transport.writes.get(6);
assertTrue("Unexpected frame type", frame instanceof Disposition);
assertEquals("Unexpected delivery id", ((Disposition) frame).getFirst(), UnsignedInteger.ONE);
assertEquals("Unexpected delivery id", ((Disposition) frame).getLast(), UnsignedInteger.ONE);
delivery2.disposition(Accepted.getInstance());
delivery2.settle();
pumpMockTransport(transport);
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 8, transport.writes.size());
frame = transport.writes.get(7);
assertTrue("Unexpected frame type", frame instanceof Disposition);
assertEquals("Unexpected delivery id", ((Disposition) frame).getFirst(), UnsignedInteger.valueOf(2));
assertEquals("Unexpected delivery id", ((Disposition) frame).getLast(), UnsignedInteger.valueOf(2));
}
private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, int deliveryId, String deliveryTag, byte[] partialPayload, boolean more)
{
handlePartialTransfer(transport, handle, UnsignedInteger.valueOf(deliveryId), deliveryTag, partialPayload, more);
}
private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more)
{
handlePartialTransfer(transport, handle, deliveryId, deliveryTag, partialPayload, more, false);
}
private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more, boolean aborted)
{
byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
Transfer transfer = new Transfer();
transfer.setHandle(handle);
transfer.setDeliveryTag(new Binary(tag));
transfer.setMessageFormat(UnsignedInteger.valueOf(DeliveryImpl.DEFAULT_MESSAGE_FORMAT));
transfer.setMore(more);
transfer.setAborted(aborted);
if(deliveryId != null) {
// Can be omitted in continuation frames for a given delivery.
transfer.setDeliveryId(deliveryId);
}
transport.handleFrame(new TransportFrame(0, transfer, new Binary(partialPayload, 0, partialPayload.length)));
}
private ArrayList<byte[]> createTransferPayloads(String content, int payloadChunkSize)
{
ArrayList<byte[]> payloadChunks = new ArrayList<>();
Message m = Message.Factory.create();
m.setBody(new AmqpValue(content));
byte[] encoded = new byte[BUFFER_SIZE];
int len = m.encode(encoded, 0, BUFFER_SIZE);
assertTrue("given array was too small", len < BUFFER_SIZE);
int copied = 0;
while(copied < len) {
int chunkSize = Math.min(len - copied, payloadChunkSize);
byte[] chunk = new byte[chunkSize];
System.arraycopy(encoded, copied, chunk, 0, chunkSize);
payloadChunks.add(chunk);
copied += chunkSize;
}
assertFalse("no payload chunks to return", payloadChunks.isEmpty());
return payloadChunks;
}
@Test
public void testDeliveryIdOutOfSequenceCausesISE() {
MockTransportImpl transport = new MockTransportImpl();
transport.setEmitFlowEventOnSend(false);
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
session.open();
String linkName1 = "myReceiver1";
Receiver receiver1 = session.receiver(linkName1);
receiver1.flow(5);
receiver1.open();
String linkName2 = "myReceiver2";
Receiver receiver2 = session.receiver(linkName2);
receiver2.flow(5);
receiver2.open();
pumpMockTransport(transport);
final UnsignedInteger r1handle = UnsignedInteger.ZERO;
final UnsignedInteger r2handle = UnsignedInteger.ONE;
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
// Give the necessary responses to open/begin/attach
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
begin.setNextOutgoingId(UnsignedInteger.ONE);
begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach1 = new Attach();
attach1.setHandle(r1handle);
attach1.setRole(Role.SENDER);
attach1.setName(linkName1);
attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach1, null));
Attach attach2 = new Attach();
attach2.setHandle(r2handle);
attach2.setRole(Role.SENDER);
attach2.setName(linkName2);
attach2.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach2, null));
String deliveryTag1 = "tag1";
String deliveryTag2 = "tag2";
handlePartialTransfer(transport, r2handle, 2, deliveryTag2, new byte[]{ 2 }, false);
try {
handlePartialTransfer(transport, r1handle, 1, deliveryTag1, new byte[]{ 1 }, false);
fail("Expected an ISE");
} catch(IllegalStateException ise) {
// Expected
assertTrue("Unexpected exception:" + ise, ise.getMessage().contains("Expected delivery-id 3, got 1"));
}
}
@Test
public void testDeliveryIdMissingOnInitialTransferCausesISE() {
MockTransportImpl transport = new MockTransportImpl();
transport.setEmitFlowEventOnSend(false);
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
session.open();
String linkName1 = "myReceiver1";
Receiver receiver1 = session.receiver(linkName1);
receiver1.flow(5);
receiver1.open();
pumpMockTransport(transport);
final UnsignedInteger r1handle = UnsignedInteger.ZERO;
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
// Give the necessary responses to open/begin/attach
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
begin.setNextOutgoingId(UnsignedInteger.ONE);
begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach1 = new Attach();
attach1.setHandle(r1handle);
attach1.setRole(Role.SENDER);
attach1.setName(linkName1);
attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach1, null));
// Receive a delivery without any delivery-id on the [first] transfer frame, expect it to fail.
try {
handlePartialTransfer(transport, r1handle, null, "tag1", new byte[]{ 1 }, false);
fail("Expected an ISE");
} catch(IllegalStateException ise) {
// Expected
assertEquals("Unexpected message", "No delivery-id specified on first Transfer of new delivery", ise.getMessage());
}
}
@Test
public void testMultiplexDeliveriesOnSameReceiverLinkCausesISE() {
MockTransportImpl transport = new MockTransportImpl();
transport.setEmitFlowEventOnSend(false);
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
session.open();
String linkName1 = "myReceiver1";
Receiver receiver1 = session.receiver(linkName1);
receiver1.flow(5);
receiver1.open();
pumpMockTransport(transport);
final UnsignedInteger r1handle = UnsignedInteger.ZERO;
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
// Give the necessary responses to open/begin/attach
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
begin.setNextOutgoingId(UnsignedInteger.ONE);
begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach1 = new Attach();
attach1.setHandle(r1handle);
attach1.setRole(Role.SENDER);
attach1.setName(linkName1);
attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach1, null));
// Receive first transfer for a multi-frame delivery
handlePartialTransfer(transport, r1handle, 1, "tag1", new byte[]{ 1 }, true);
// Receive first transfer for ANOTHER multi-frame delivery, expect it to fail
// as multiplexing deliveries on a single link is forbidden by the spec.
try {
handlePartialTransfer(transport, r1handle, 2, "tag2", new byte[]{ 2 }, true);
fail("Expected an ISE");
} catch(IllegalStateException ise) {
// Expected
assertEquals("Unexpected message", "Illegal multiplex of deliveries on same link with delivery-id 1 and 2", ise.getMessage());
}
}
@Test
public void testDeliveryIdTrackingHandlesAbortedDelivery() {
MockTransportImpl transport = new MockTransportImpl();
transport.setEmitFlowEventOnSend(false);
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
session.open();
String linkName1 = "myReceiver1";
Receiver receiver1 = session.receiver(linkName1);
receiver1.flow(5);
receiver1.open();
pumpMockTransport(transport);
final UnsignedInteger r1handle = UnsignedInteger.ZERO;
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
// Give the necessary responses to open/begin/attach
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
begin.setNextOutgoingId(UnsignedInteger.ONE);
begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach1 = new Attach();
attach1.setHandle(r1handle);
attach1.setRole(Role.SENDER);
attach1.setName(linkName1);
attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach1, null));
// Receive first transfer for a multi-frame delivery
assertEquals("Unexpected queued count", 0, receiver1.getQueued());
handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, "tag1", new byte[]{ 1 }, true);
assertEquals("Unexpected queued count", 1, receiver1.getQueued());
// Receive second transfer for a multi-frame delivery, indicating it is aborted
handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, "tag1", new byte[]{ 2 }, true, true);
assertEquals("Unexpected queued count", 1, receiver1.getQueued());
// Receive first transfer for ANOTHER delivery, expect it not to fail, since the earlier delivery aborted
handlePartialTransfer(transport, r1handle, UnsignedInteger.ONE, "tag2", new byte[]{ 3 }, false);
assertEquals("Unexpected queued count", 2, receiver1.getQueued());
receiver1.advance();
verifyDeliveryRawPayload(receiver1, "tag2", new byte[] { 3 });
}
@Test
public void testDeliveryWithIdOmittedOnContinuationTransfers() {
MockTransportImpl transport = new MockTransportImpl();
transport.setEmitFlowEventOnSend(false);
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
session.open();
String linkName1 = "myReceiver1";
Receiver receiver1 = session.receiver(linkName1);
receiver1.flow(5);
receiver1.open();
String linkName2 = "myReceiver2";
Receiver receiver2 = session.receiver(linkName2);
receiver2.flow(5);
receiver2.open();
pumpMockTransport(transport);
final UnsignedInteger r1handle = UnsignedInteger.ZERO;
final UnsignedInteger r2handle = UnsignedInteger.ONE;
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
// Give the necessary responses to open/begin/attach
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
begin.setNextOutgoingId(UnsignedInteger.ONE);
begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach1 = new Attach();
attach1.setHandle(r1handle);
attach1.setRole(Role.SENDER);
attach1.setName(linkName1);
attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach1, null));
Attach attach2 = new Attach();
attach2.setHandle(r2handle);
attach2.setRole(Role.SENDER);
attach2.setName(linkName2);
attach2.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach2, null));
String deliveryTag1 = "tag1";
String deliveryTag2 = "tag2";
// Send multi-frame deliveries for each link, multiplexed together, and omit
// the delivery-id on the continuation frames as allowed for by the spec.
handlePartialTransfer(transport, r1handle, 1, deliveryTag1, new byte[]{ 1 }, true);
handlePartialTransfer(transport, r2handle, 2, deliveryTag2, new byte[]{ 101 }, true);
handlePartialTransfer(transport, r2handle, null, deliveryTag2, new byte[]{ 102 }, true);
handlePartialTransfer(transport, r1handle, null, deliveryTag1, new byte[]{ 2 }, true);
handlePartialTransfer(transport, r1handle, null, deliveryTag1, new byte[]{ 3 }, false);
handlePartialTransfer(transport, r2handle, null, deliveryTag2, new byte[]{ 103 }, true);
handlePartialTransfer(transport, r2handle, null, deliveryTag2, new byte[]{ 104 }, false);
// Verify the transfer frames were all matched to compose the expected delivery payload.
verifyDeliveryRawPayload(receiver1, deliveryTag1, new byte[] { 1, 2, 3 });
verifyDeliveryRawPayload(receiver2, deliveryTag2, new byte[] { 101, 102, 103, 104 });
}
@Test
public void testDeliveryIdThresholdsAndWraps() {
// Check start from 0
doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.ZERO, UnsignedInteger.ONE, UnsignedInteger.valueOf(2));
// Check run up to max-int (interesting boundary for underlying impl)
doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.valueOf(Integer.MAX_VALUE - 2), UnsignedInteger.valueOf(Integer.MAX_VALUE -1), UnsignedInteger.valueOf(Integer.MAX_VALUE));
// Check crossing from signed range value into unsigned range value (interesting boundary for underlying impl)
long maxIntAsLong = Integer.MAX_VALUE;
doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.valueOf(maxIntAsLong), UnsignedInteger.valueOf(maxIntAsLong + 1L), UnsignedInteger.valueOf(maxIntAsLong + 2L));
// Check run up to max-uint
doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.valueOf(0xFFFFFFFFL - 2), UnsignedInteger.valueOf(0xFFFFFFFFL - 1), UnsignedInteger.MAX_VALUE);
// Check wrapping from max unsigned value back to min(/0).
doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.MAX_VALUE, UnsignedInteger.ZERO, UnsignedInteger.ONE);
}
private void doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger deliveryId1, UnsignedInteger deliveryId2, UnsignedInteger deliveryId3) {
MockTransportImpl transport = new MockTransportImpl();
transport.setEmitFlowEventOnSend(false);
Connection connection = Proton.connection();
transport.bind(connection);
connection.open();
Session session = connection.session();
session.open();
String linkName1 = "myReceiver1";
Receiver receiver1 = session.receiver(linkName1);
receiver1.flow(5);
receiver1.open();
pumpMockTransport(transport);
final UnsignedInteger r1handle = UnsignedInteger.ZERO;
assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
// Give the necessary responses to open/begin/attach
transport.handleFrame(new TransportFrame(0, new Open(), null));
Begin begin = new Begin();
begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
begin.setNextOutgoingId(UnsignedInteger.ONE);
begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
transport.handleFrame(new TransportFrame(0, begin, null));
Attach attach1 = new Attach();
attach1.setHandle(r1handle);
attach1.setRole(Role.SENDER);
attach1.setName(linkName1);
attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
transport.handleFrame(new TransportFrame(0, attach1, null));
String deliveryTag1 = "tag1";
String deliveryTag2 = "tag2";
String deliveryTag3 = "tag3";
// Send deliveries with the given delivery-id
handlePartialTransfer(transport, r1handle, deliveryId1, deliveryTag1, new byte[]{ 1 }, false);
handlePartialTransfer(transport, r1handle, deliveryId2, deliveryTag2, new byte[]{ 2 }, false);
handlePartialTransfer(transport, r1handle, deliveryId3, deliveryTag3, new byte[]{ 3 }, false);
// Verify deliveries arrived with expected payload
verifyDeliveryRawPayload(receiver1, deliveryTag1, new byte[] { 1 });
verifyDeliveryRawPayload(receiver1, deliveryTag2, new byte[] { 2 });
verifyDeliveryRawPayload(receiver1, deliveryTag3, new byte[] { 3 });
}
}