blob: 7a832f3295d0246f75d00553d6ad4d7cb52ceace [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.transport.TestNetworkConnection;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicConsumeOkBody;
import org.apache.qpid.framing.ChannelFlowOkBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.url.AMQBindingURL;
public class AMQSession_0_8Test extends QpidTestCase
{
private AMQConnection _connection;
public void setUp() throws Exception
{
super.setUp();
_connection = new MockAMQConnection("amqp://guest:guest@/test?brokerlist='tcp://localhost:5672'");
NetworkConnection network = new TestNetworkConnection();
_connection.getProtocolHandler().setNetworkConnection(network);
}
public void testQueueNameIsGeneratedOnDeclareQueueWithEmptyQueueName() throws Exception
{
final String testQueueName = "tmp_127_0_0_1_1_1";
_connection.setConnectionListener(new MockReceiveConnectionListener(_connection, 1,
new QueueDeclareOkBody(AMQShortString.valueOf(testQueueName), 0, 0)));
AMQSession_0_8 session = new AMQSession_0_8(_connection, 1, true, 0, 1, 1);
AMQBindingURL bindingURL = new AMQBindingURL("topic://amq.topic//?routingkey='testTopic'");
AMQQueue queue = new AMQQueue(bindingURL);
assertEquals("Unexpected queue name", "", queue.getAMQQueueName());
session.declareQueue(queue, true);
assertEquals("Unexpected queue name", testQueueName, queue.getAMQQueueName());
}
public void testResubscribe() throws Exception
{
// to verify producer resubscribe set qpid.declare_exchanges=true
setTestSystemProperty(ClientProperties.QPID_DECLARE_EXCHANGES_PROP_NAME, "true");
setTestSystemProperty(ClientProperties.QPID_DECLARE_QUEUES_PROP_NAME, "false");
setTestSystemProperty(ClientProperties.QPID_BIND_QUEUES_PROP_NAME, "false");
AMQSession_0_8 session = new AMQSession_0_8(_connection, 1, true, 0, 1, 1);
AMQQueue queue1 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test1?routingkey='test1'"));
// expecting exchange declare Ok
MockReceiveConnectionListener listener = new MockReceiveConnectionListener(_connection, 1, new ExchangeDeclareOkBody());
_connection.setConnectionListener(listener);
session.createProducer(queue1);
assertTrue("Not all expected commands have been sent on producer1 creation", listener.responsesEmpty());
// expecting exchange declare Ok, Channel flow Ok, Consume Ok
listener = new MockReceiveConnectionListener(_connection, 1,
new ExchangeDeclareOkBody(), new ChannelFlowOkBody(false), new BasicConsumeOkBody(AMQShortString.valueOf("1")));
_connection.setConnectionListener(listener );
BasicMessageConsumer_0_8 consumer1 = (BasicMessageConsumer_0_8)session.createConsumer(queue1);
assertTrue("Not all expected commands have been sent on consumer1 creation", listener.responsesEmpty());
// expecting exchange declare Ok
listener = new MockReceiveConnectionListener(_connection, 1, new ExchangeDeclareOkBody());
_connection.setConnectionListener(listener);
AMQQueue queue2 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test2?routingkey='test2'"));
session.createProducer(queue2);
assertTrue("Not all expected commands have been sent on producer2 creation", listener.responsesEmpty());
// expecting exchange declare Ok, Consume Ok
listener = new MockReceiveConnectionListener(_connection, 1,
new ExchangeDeclareOkBody(), new BasicConsumeOkBody(AMQShortString.valueOf("2")));
_connection.setConnectionListener(listener);
BasicMessageConsumer_0_8 consumer2 = (BasicMessageConsumer_0_8)session.createConsumer(queue2);
assertTrue("Not all expected commands have been sent on consumer2 creation", listener.responsesEmpty());
UnprocessedMessage[] messages = new UnprocessedMessage[4];
for (int i =0; i< messages.length;i++ )
{
String consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag();
int deliveryTag = i + 1;
messages[i]= createMockMessage(deliveryTag, consumerTag);
session.messageReceived(messages[i]);
if (deliveryTag % 2 == 0)
{
session.addDeliveredMessage(deliveryTag);
}
else
{
session.addUnacknowledgedMessage(deliveryTag);
}
}
assertEquals("Unexpected highest delivery tag", messages.length, session.getHighestDeliveryTag().get());
assertFalse("Unexpected delivered message tags", session.getDeliveredMessageTags().isEmpty());
assertFalse("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty());
assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers()));
listener = new MockReceiveConnectionListener(_connection, 1,
new ExchangeDeclareOkBody(), // first producer resubscribe
new ExchangeDeclareOkBody(), // second producer resubscribe
new ExchangeDeclareOkBody(), new BasicConsumeOkBody(AMQShortString.valueOf("1")), // first consumer resubscribe
new ExchangeDeclareOkBody(), new BasicConsumeOkBody(AMQShortString.valueOf("2"))); // second consumer resubscribe
_connection.setConnectionListener(listener);
session.resubscribe();
assertTrue("Not all expected commands have been sent on session resubscribe", listener.responsesEmpty());
assertEquals("Unexpected highest delivery tag", -1, session.getHighestDeliveryTag().get());
assertTrue("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty());
assertTrue("Unexpected delivered message tags", session.getDeliveredMessageTags().isEmpty());
assertTrue("Unexpected pre-fetched message tags", session.getPrefetchedMessageTags().isEmpty());
assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers()));
}
private UnprocessedMessage createMockMessage(long deliveryTag, String consumerTag)
{
UnprocessedMessage message = mock(UnprocessedMessage.class);
when(message.getConsumerTag()).thenReturn(consumerTag);
when(message.getDeliveryTag()).thenReturn(deliveryTag);
return message;
}
static class MockReceiveConnectionListener extends ConnectionListenerSupport
{
private final AMQConnection _connection;
private final List<AMQBody> _responses;
private final int _channelId;
MockReceiveConnectionListener(AMQConnection connection, int channelId, AMQBody... response)
{
_connection = connection;
_responses = new ArrayList<>(Arrays.asList(response));
_channelId = channelId;
}
@Override
public void bytesSent(long count)
{
try
{
AMQBody response = _responses.remove(0);
_connection.getProtocolHandler().methodBodyReceived(_channelId, response);
}
catch (QpidException e)
{
throw new RuntimeException(e);
}
}
public boolean responsesEmpty()
{
return _responses.isEmpty();
}
}
}