blob: cb167f8e8e41d3dc5ea304a6b7d5eb709383d7ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.proton.systemtests;
import static java.util.EnumSet.of;
import static org.apache.qpid.proton.engine.EndpointState.ACTIVE;
import static org.apache.qpid.proton.engine.EndpointState.CLOSED;
import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED;
import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.logging.Logger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.junit.Ignore;
import org.junit.Test;
/**
* Simple example to illustrate the use of the Engine and Message APIs.
*
* Implemented as a JUnit test for convenience, although the main purpose is to educate the reader
* rather than test the code.
*
* To see the protocol trace, add the following line to test/resources/logging.properties:
*
* org.apache.qpid.proton.logging.LoggingProtocolTracer.sent.level = ALL
*
* and to see the byte level trace, add the following:
*
* org.apache.qpid.proton.systemtests.ProtonEngineExampleTest.level = ALL
*
* Does not illustrate use of the Messenger API.
*/
public class ProtonEngineExampleTest extends EngineTestBase
{
private static final Logger LOGGER = Logger.getLogger(ProtonEngineExampleTest.class.getName());
private static final int BUFFER_SIZE = 4096;
private final String _targetAddress = getServer().containerId + "-link1-target";
@Test
public void test() throws Exception
{
LOGGER.fine(bold("======== About to create transports"));
getClient().transport = Proton.transport();
ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX);
getServer().transport = Proton.transport();
ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX);
doOutputInputCycle();
getClient().connection = Proton.connection();
getClient().transport.bind(getClient().connection);
getServer().connection = Proton.connection();
getServer().transport.bind(getServer().connection);
LOGGER.fine(bold("======== About to open connections"));
getClient().connection.open();
getServer().connection.open();
doOutputInputCycle();
LOGGER.fine(bold("======== About to open sessions"));
getClient().session = getClient().connection.session();
getClient().session.open();
pumpClientToServer();
getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
getServer().session.open();
assertEndpointState(getServer().session, ACTIVE, ACTIVE);
pumpServerToClient();
assertEndpointState(getClient().session, ACTIVE, ACTIVE);
LOGGER.fine(bold("======== About to create sender"));
getClient().source = new Source();
getClient().source.setAddress(null);
getClient().target = new Target();
getClient().target.setAddress(_targetAddress);
getClient().sender = getClient().session.sender("link1");
getClient().sender.setTarget(getClient().target);
getClient().sender.setSource(getClient().source);
// Exactly once delivery semantics
getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
getClient().sender.setReceiverSettleMode(ReceiverSettleMode.SECOND);
assertEndpointState(getClient().sender, UNINITIALIZED, UNINITIALIZED);
getClient().sender.open();
assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED);
pumpClientToServer();
LOGGER.fine(bold("======== About to set up implicitly created receiver"));
// A real application would be interested in more states than simply ACTIVE, as there
// exists the possibility that the link could have moved to another state already e.g. CLOSED.
// (See pipelining).
getServer().receiver = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
// Accept the settlement modes suggested by the client
getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode());
getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode());
org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = getServer().receiver.getRemoteTarget();
assertTerminusEquals(getClient().target, serverRemoteTarget);
getServer().receiver.setTarget(applicationDeriveTarget(serverRemoteTarget));
assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE);
getServer().receiver.open();
assertEndpointState(getServer().receiver, ACTIVE, ACTIVE);
pumpServerToClient();
assertEndpointState(getClient().sender, ACTIVE, ACTIVE);
getServer().receiver.flow(1);
pumpServerToClient();
LOGGER.fine(bold("======== About to create a message and send it to the server"));
getClient().message = Proton.message();
Section messageBody = new AmqpValue("Hello");
getClient().message.setBody(messageBody);
getClient().messageData = new byte[BUFFER_SIZE];
int lengthOfEncodedMessage = getClient().message.encode(getClient().messageData, 0, BUFFER_SIZE);
getTestLoggingHelper().prettyPrint(TestLoggingHelper.MESSAGE_PREFIX, Arrays.copyOf(getClient().messageData, lengthOfEncodedMessage));
byte[] deliveryTag = "delivery1".getBytes();
getClient().delivery = getClient().sender.delivery(deliveryTag);
int numberOfBytesAcceptedBySender = getClient().sender.send(getClient().messageData, 0, lengthOfEncodedMessage);
assertEquals("For simplicity, assume the sender can accept all the data",
lengthOfEncodedMessage, numberOfBytesAcceptedBySender);
assertNull(getClient().delivery.getLocalState());
boolean senderAdvanced = getClient().sender.advance();
assertTrue("sender has not advanced", senderAdvanced);
pumpClientToServer();
LOGGER.fine(bold("======== About to process the message on the server"));
getServer().delivery = getServer().connection.getWorkHead();
assertEquals("The received delivery should be on our receiver",
getServer().receiver, getServer().delivery.getLink());
assertNull(getServer().delivery.getLocalState());
assertNull(getServer().delivery.getRemoteState());
assertFalse(getServer().delivery.isPartial());
assertTrue(getServer().delivery.isReadable());
getServer().messageData = new byte[BUFFER_SIZE];
int numberOfBytesProducedByReceiver = getServer().receiver.recv(getServer().messageData, 0, BUFFER_SIZE);
assertEquals(numberOfBytesAcceptedBySender, numberOfBytesProducedByReceiver);
getServer().message = Proton.message();
getServer().message.decode(getServer().messageData, 0, numberOfBytesProducedByReceiver);
boolean messageProcessed = applicationProcessMessage(getServer().message);
assertTrue(messageProcessed);
getServer().delivery.disposition(Accepted.getInstance());
assertEquals(Accepted.getInstance(), getServer().delivery.getLocalState());
pumpServerToClient();
assertEquals(Accepted.getInstance(), getClient().delivery.getRemoteState());
LOGGER.fine(bold("======== About to accept and settle the message on the client"));
Delivery clientDelivery = getClient().connection.getWorkHead();
assertEquals(getClient().delivery, clientDelivery);
assertTrue(clientDelivery.isUpdated());
assertEquals(getClient().sender, clientDelivery.getLink());
clientDelivery.disposition(clientDelivery.getRemoteState());
assertEquals(Accepted.getInstance(), getClient().delivery.getLocalState());
clientDelivery.settle();
assertNull("Now we've settled, the delivery should no longer be in the work list", getClient().connection.getWorkHead());
pumpClientToServer();
LOGGER.fine(bold("======== About to settle the message on the server"));
assertEquals(Accepted.getInstance(), getServer().delivery.getRemoteState());
Delivery serverDelivery = getServer().connection.getWorkHead();
assertEquals(getServer().delivery, serverDelivery);
assertTrue(serverDelivery.isUpdated());
assertTrue("Client should have already settled", serverDelivery.remotelySettled());
serverDelivery.settle();
assertTrue(serverDelivery.isSettled());
assertNull("Now we've settled, the delivery should no longer be in the work list", getServer().connection.getWorkHead());
// Increment the receiver's credit so its ready for another message.
// When using proton-c, this call is required in order to generate a Flow frame
// (proton-j sends one even without it to eagerly restore the session incoming window).
getServer().receiver.flow(1);
pumpServerToClient();
LOGGER.fine(bold("======== About to close client's sender"));
getClient().sender.close();
pumpClientToServer();
LOGGER.fine(bold("======== Server about to process client's link closure"));
assertSame(getServer().receiver, getServer().connection.linkHead(of(ACTIVE), of(CLOSED)));
getServer().receiver.close();
pumpServerToClient();
LOGGER.fine(bold("======== About to close client's session"));
getClient().session.close();
pumpClientToServer();
LOGGER.fine(bold("======== Server about to process client's session closure"));
assertSame(getServer().session, getServer().connection.sessionHead(of(ACTIVE), of(CLOSED)));
getServer().session.close();
pumpServerToClient();
LOGGER.fine(bold("======== About to close client's connection"));
getClient().connection.close();
pumpClientToServer();
LOGGER.fine(bold("======== Server about to process client's connection closure"));
assertEquals(CLOSED, getServer().connection.getRemoteState());
getServer().connection.close();
pumpServerToClient();
LOGGER.fine(bold("======== Checking client has nothing more to pump"));
assertClientHasNothingToOutput();
LOGGER.fine(bold("======== Done!"));
}
@Ignore("This test does not have a fix yet")
@Test
public void testPROTON_1017() throws Exception
{
LOGGER.fine(bold("======== About to create transports"));
getClient().transport = Proton.transport();
ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX);
getServer().transport = Proton.transport();
ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX);
doOutputInputCycle();
getClient().connection = Proton.connection();
getClient().transport.bind(getClient().connection);
getServer().connection = Proton.connection();
getServer().transport.bind(getServer().connection);
LOGGER.fine(bold("======== About to open connections"));
getClient().connection.open();
getServer().connection.open();
doOutputInputCycle();
LOGGER.fine(bold("======== About to open and close client session"));
getClient().session = getClient().connection.session();
getClient().session.open();
getClient().session.close();
pumpClientToServer();
getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(CLOSED));
assertEndpointState(getServer().session, UNINITIALIZED, CLOSED);
getServer().session.open();
assertEndpointState(getServer().session, ACTIVE, CLOSED);
getServer().session.close();
assertEndpointState(getServer().session, CLOSED, CLOSED);
pumpServerToClient();
assertEndpointState(getClient().session, CLOSED, CLOSED);
LOGGER.fine(bold("======== About to close client's connection"));
getClient().connection.close();
pumpClientToServer();
LOGGER.fine(bold("======== Server about to process client's connection closure"));
assertEquals(CLOSED, getServer().connection.getRemoteState());
getServer().connection.close();
pumpServerToClient();
LOGGER.fine(bold("======== Checking client has nothing more to pump"));
assertClientHasNothingToOutput();
LOGGER.fine(bold("======== Done!"));
}
/**
* Simulates creating a local terminus using the properties supplied by the remote link endpoint.
*
* In a broker you'd usually overlay serverRemoteTarget (eg its filter properties) onto
* an existing object (which eg contains whether it's a queue or a topic), creating a new one from that
* overlay. Also if this is link recovery then you'd fetch the unsettled map too.
*/
private org.apache.qpid.proton.amqp.transport.Target applicationDeriveTarget(org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget)
{
return serverRemoteTarget;
}
/**
* Simulates processing a message.
*/
private boolean applicationProcessMessage(Message message)
{
Object messageBody = ((AmqpValue)message.getBody()).getValue();
return "Hello".equals(messageBody);
}
}