QPID-8074: [JMS AMQP 0-x][System Tests] Move AMQP 0-x client specific ProducerFlowControlTest from broker-j
diff --git a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java
index 2db51a7..6253fe3 100644
--- a/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java
+++ b/systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java
@@ -397,7 +397,7 @@
}
@SuppressWarnings("unused")
- Map<String, Object> readEntityUsingAmqpManagement(final String name,
+ public Map<String, Object> readEntityUsingAmqpManagement(final String name,
final String type,
final boolean actuals,
final Session session)
diff --git a/systests/src/test/java/org/apache/qpid/systest/producer/ProducerFlowControlTest.java b/systests/src/test/java/org/apache/qpid/systest/producer/ProducerFlowControlTest.java
new file mode 100644
index 0000000..b7c6ec9
--- /dev/null
+++ b/systests/src/test/java/org/apache/qpid/systest/producer/ProducerFlowControlTest.java
@@ -0,0 +1,154 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.systest.producer;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
+
+import java.util.Collections;
+import java.util.Map;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.systest.core.BrokerAdmin;
+import org.apache.qpid.systest.core.JmsTestBase;
+import org.apache.qpid.systest.core.brokerj.AmqpManagementFacade;
+
+public class ProducerFlowControlTest extends JmsTestBase
+{
+ private Connection _producerConnection;
+ private MessageProducer _producer;
+ private BytesMessage _message;
+ private AmqpManagementFacade _managementFacade;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ assumeThat("Test suite requires amqp management",
+ getBrokerAdmin().getBrokerType(),
+ is(equalTo(BrokerAdmin.BrokerType.BROKERJ)));
+ _managementFacade = new AmqpManagementFacade();
+
+ System.setProperty("qpid.flow_control_wait_failure", "3000");
+ System.setProperty("qpid.flow_control_wait_notify_period", "1000");
+
+ _producerConnection = getConnection(Collections.singletonMap("sync_publish", "all"));
+ _producerConnection.start();
+ final Session producerSession = _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String queueName = getTestQueueName();
+ final Queue queue = createAndBindQueueWithFlowControlEnabled(producerSession, queueName, 1000, 800);
+
+ _producer = producerSession.createProducer(queue);
+ _message = producerSession.createBytesMessage();
+ _message.writeBytes(new byte[1100]);
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ System.clearProperty("qpid.flow_control_wait_failure");
+ System.clearProperty("qpid.flow_control_wait_notify_period");
+ if (_producerConnection != null)
+ {
+ _producerConnection.close();
+ }
+ }
+
+ @Test
+ public void testClientLogMessages() throws Exception
+ {
+ _producer.send(_message);
+
+ final long timeout = System.currentTimeMillis() + getReceiveTimeout();
+ awaitFlowBlocked(timeout);
+
+ // Ensure that the client has processed the incoming flow/messagestop
+ _producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE).close();
+ try
+ {
+ _producer.send(_message);
+ fail("Producer should be blocked by flow control");
+ }
+ catch (JMSException e)
+ {
+ final String expectedMsg =
+ "0-10".equals(getProtocol()) ? "Exception when sending message:timed out waiting for message credit"
+ : "Unable to send message for 3 seconds due to broker enforced flow control";
+ assertEquals("Unexpected exception reason", expectedMsg, e.getMessage());
+ }
+ }
+
+ private void awaitFlowBlocked(final long timeout) throws Exception
+ {
+ Connection con = getConnection();
+ con.start();
+ Session session = con.createSession(true, Session.SESSION_TRANSACTED);
+ try
+ {
+ boolean queueFlowStopped;
+ do
+ {
+ Map<String, Object> response = _managementFacade.readEntityUsingAmqpManagement(getTestQueueName(),
+ "org.apache.qpid.Queue",
+ false,
+ session);
+
+ queueFlowStopped = Boolean.parseBoolean(String.valueOf(response.get("queueFlowStopped")));
+ }
+ while (!queueFlowStopped || System.currentTimeMillis() > timeout);
+ assertTrue("Flow did not become blocked within timeout", queueFlowStopped);
+ }
+ finally
+ {
+ con.close();
+ }
+ }
+
+ private Queue createAndBindQueueWithFlowControlEnabled(Session session,
+ String queueName,
+ int capacity,
+ int resumeCapacity) throws Exception
+ {
+ return session.createQueue(String.format(
+ "ADDR:%s; {create: always, node: {x-declare: {arguments:{'x-qpid-capacity': %d,"
+ + " 'x-qpid-flow-resume-capacity': %d}}}}",
+ queueName,
+ capacity,
+ resumeCapacity));
+ }
+}
+
+
+