Merge pull request #357 from bd2019us/AMQ-7199
[AMQ-7199] Replace Math.Random with Random.nextDouble
diff --git a/NOTICE b/NOTICE
index b2e0c43..927e2d2 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,4 +1,4 @@
-Apache ActiveMQ Copyright 2005-2017 Apache Software Foundation
+Apache ActiveMQ Copyright 2005-2019 Apache Software Foundation
This product includes software developed by
The Apache Software Foundation (http://www.apache.org/).
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
index 346a54c..ffe9ccc 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java
@@ -319,7 +319,7 @@
footerMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
- footerMap.put(name, value);
+ footerMap.put(Symbol.valueOf(name), value);
continue;
}
} else if (key.startsWith(AMQ_SCHEDULED_MESSAGE_PREFIX )) {
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
index 201cee2..deb0c0d 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java
@@ -22,6 +22,8 @@
import static org.junit.Assert.assertTrue;
import java.net.URI;
+import java.util.LinkedHashMap;
+import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@@ -36,6 +38,11 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
@@ -188,6 +195,54 @@
openwireConn.close();
}
+ @Test(timeout = 60000)
+ public void testSendAMQPMessageWithComplexAnnotationsReceiveCore() throws Exception {
+ startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=jms"));
+
+ URI remoteURI = new URI("tcp://" + amqpConnectionURI.getHost() + ":" + amqpConnectionURI.getPort());
+ AmqpClient client = new AmqpClient(remoteURI, null, null);
+ AmqpConnection connection = client.connect();
+ try {
+ connection.connect();
+
+ String annotation = "x-opt-embedded-map";
+ Map<String, String> embeddedMap = new LinkedHashMap<>();
+ embeddedMap.put("test-key-1", "value-1");
+ embeddedMap.put("test-key-2", "value-2");
+ embeddedMap.put("test-key-3", "value-3");
+
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(TEST_QUEUE);
+ AmqpMessage message = createAmqpMessage((byte) 'A', 65535);
+
+ message.setApplicationProperty("IntProperty", 42);
+ message.setDurable(true);
+ message.setMessageAnnotation(annotation, embeddedMap);
+ sender.send(message);
+
+ session.close();
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(openwireConnectionURI);
+ Connection connection2 = factory.createConnection();
+ try {
+
+ Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection2.start();
+ MessageConsumer consumer = session2.createConsumer(session2.createQueue(TEST_QUEUE));
+
+ Message received = consumer.receive(5000);
+ assertNotNull(received);
+ assertEquals(42, received.getIntProperty("IntProperty"));
+
+ connection2.close();
+ } finally {
+ connection2.close();
+ }
+ } finally {
+ connection.close();
+ }
+ }
+
public void startBrokerWithAmqpTransport(String amqpUrl) throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
@@ -211,4 +266,14 @@
brokerService = null;
}
}
+
+ private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
+ AmqpMessage message = new AmqpMessage();
+ byte[] payload = new byte[payloadSize];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = value;
+ }
+ message.setBytes(payload);
+ return message;
+ }
}
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index a86edbb..67e0dc9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -23,8 +23,10 @@
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -47,10 +49,13 @@
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.util.Wait;
+import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.message.Message;
+import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
@@ -65,6 +70,8 @@
protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
+ private static final int PAYLOAD = 110 * 1024;
+
@Test(timeout = 60000)
public void testSimpleSendOneReceiveOneToQueue() throws Exception {
doTestSimpleSendOneReceiveOne(Queue.class);
@@ -858,4 +865,66 @@
receiver.close();
connection.close();
}
+
+ @Test(timeout = 60000)
+ public void testSendAMQPMessageWithComplexAnnotationsReceiveAMQP() throws Exception {
+ String testQueueName = "ConnectionFrameSize";
+ int nMsgs = 200;
+
+ AmqpClient client = createAmqpClient();
+
+ Symbol annotation = Symbol.valueOf("x-opt-embedded-map");
+ Map<String, String> embeddedMap = new LinkedHashMap<>();
+ embeddedMap.put("test-key-1", "value-1");
+ embeddedMap.put("test-key-2", "value-2");
+ embeddedMap.put("test-key-3", "value-3");
+
+ {
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(testQueueName);
+ AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD);
+
+ message.setApplicationProperty("IntProperty", 42);
+ message.setDurable(true);
+ message.setMessageAnnotation(annotation.toString(), embeddedMap);
+ sender.send(message);
+ session.close();
+ connection.close();
+ }
+
+ {
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createReceiver(testQueueName);
+ receiver.flow(nMsgs);
+
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull("Failed to read message with embedded map in annotations", message);
+ MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
+ if (wrapped.getBody() instanceof Data) {
+ Data data = (Data) wrapped.getBody();
+ System.out.println("received : message: " + data.getValue().getLength());
+ assertEquals(PAYLOAD, data.getValue().getLength());
+ }
+
+ assertNotNull(message.getWrappedMessage().getMessageAnnotations());
+ assertNotNull(message.getWrappedMessage().getMessageAnnotations().getValue());
+ assertEquals(embeddedMap, message.getWrappedMessage().getMessageAnnotations().getValue().get(annotation));
+
+ message.accept();
+ session.close();
+ connection.close();
+ }
+ }
+
+ private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
+ AmqpMessage message = new AmqpMessage();
+ byte[] payload = new byte[payloadSize];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = value;
+ }
+ message.setBytes(payload);
+ return message;
+ }
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
index 3eaf28b..e856c5d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
@@ -96,7 +96,7 @@
private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
- private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
+ private final Set<ObjectName> registeredMBeans = new ConcurrentHashMap<>().newKeySet();
/* This is the first broker in the broker interceptor chain. */
private Broker contextBroker;
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index a265570..a3b9a4d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -532,9 +532,7 @@
@Override
public long getPendingMessageSize() {
- synchronized (pendingLock) {
- return pending.messageSize();
- }
+ return pending.messageSize();
}
@Override
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index e8ef717..fa75752 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -694,7 +694,7 @@
// While waiting for space to free up... the
// transaction may be done
if (message.isInTransaction()) {
- if (context.getTransaction().getState() > IN_USE_STATE) {
+ if (context.getTransaction() == null || context.getTransaction().getState() > IN_USE_STATE) {
throw new JMSException("Send transaction completed while waiting for space");
}
}
@@ -1471,10 +1471,7 @@
int count = 0;
Set<MessageReference> set = new LinkedHashSet<MessageReference>();
do {
- int oldMaxSize = getMaxPageSize();
- setMaxPageSize((int) this.destinationStatistics.getMessages().getCount());
- doPageIn(true);
- setMaxPageSize(oldMaxSize);
+ doPageIn(true, false, (messages.isCacheEnabled() || !broker.getBrokerService().isPersistent()) ? messages.size() : getMaxBrowsePageSize());
pagedInMessagesLock.readLock().lock();
try {
set.addAll(pagedInMessages.values());
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index a0f2d06..66e586c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -63,6 +63,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.JMSException;
+
+import static org.apache.activemq.transaction.Transaction.IN_USE_STATE;
+
/**
* The Topic is a destination that sends a copy of a message to every active
* Subscription registered.
@@ -409,8 +413,15 @@
public void run() {
try {
- // While waiting for space to free up... the
- // message may have expired.
+ // While waiting for space to free up...
+ // the transaction may be done
+ if (message.isInTransaction()) {
+ if (context.getTransaction() == null || context.getTransaction().getState() > IN_USE_STATE) {
+ throw new JMSException("Send transaction completed while waiting for space");
+ }
+ }
+
+ // the message may have expired.
if (message.isExpired()) {
broker.messageExpired(context, message, null);
getDestinationStatistics().getExpired().increment();
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index bf3f97b..0bf1c4e 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -16,14 +16,6 @@
*/
package org.apache.activemq.broker.region;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@@ -32,15 +24,7 @@
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.Response;
+import org.apache.activemq.command.*;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.transport.TransmitCallback;
@@ -48,6 +32,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
public class TopicSubscription extends AbstractSubscription {
private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
@@ -61,7 +53,7 @@
private int maximumPendingMessages = -1;
private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
- private int discarded;
+ private final AtomicInteger discarded = new AtomicInteger();
private final Object matchedListMutex = new Object();
private int memoryUsageHighWaterMark = 95;
// allow duplicate suppression in a ring network of brokers
@@ -448,9 +440,7 @@
@Override
public long getPendingMessageSize() {
- synchronized (matchedListMutex) {
- return matched.messageSize();
- }
+ return matched.messageSize();
}
@Override
@@ -482,9 +472,7 @@
* @return the number of messages discarded due to being a slow consumer
*/
public int discarded() {
- synchronized (matchedListMutex) {
- return discarded;
- }
+ return discarded.get();
}
/**
@@ -493,9 +481,7 @@
* prefetch buffer being full).
*/
public int matched() {
- synchronized (matchedListMutex) {
- return matched.size();
- }
+ return matched.size();
}
/**
@@ -727,7 +713,7 @@
try {
message.decrementReferenceCount();
matched.remove(message);
- discarded++;
+ discarded.incrementAndGet();
if (destination != null) {
destination.getDestinationStatistics().getDequeues().increment();
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index ba1a0c2..6a8b66a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -1074,8 +1074,11 @@
sending.setConnectionId(this.localConnectionInfo.getConnectionId());
localBroker.oneway(sending);
- //remove subscriber from map
+ //remove subscriber from local map
i.remove();
+
+ //need to remove the mapping from the remote map as well
+ subscriptionMapByRemoteId.remove(ds.getRemoteInfo().getConsumerId());
}
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/security/LDAPAuthorizationMap.java b/activemq-broker/src/main/java/org/apache/activemq/security/LDAPAuthorizationMap.java
index 2b89d12..1b90e00 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/security/LDAPAuthorizationMap.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/security/LDAPAuthorizationMap.java
@@ -102,7 +102,6 @@
initialContextFactory = "com.sun.jndi.ldap.LdapCtxFactory";
connectionURL = "ldap://localhost:10389";
connectionUsername = "uid=admin,ou=system";
- connectionPassword = "secret";
connectionProtocol = "s";
authentication = "simple";
@@ -491,4 +490,4 @@
return context;
}
-}
\ No newline at end of file
+}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java b/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java
index 44c23f6..77cbb20 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java
@@ -64,7 +64,7 @@
private final String initialContextFactory = "com.sun.jndi.ldap.LdapCtxFactory";
private String connectionURL = "ldap://localhost:1024";
private String connectionUsername = "uid=admin,ou=system";
- private String connectionPassword = "secret";
+ private String connectionPassword;
private String connectionProtocol = "s";
private String authentication = "simple";
diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java
index 4179b7b..af040a2 100644
--- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java
+++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQConfiguration.java
@@ -182,7 +182,7 @@
}
}
- public static Class<?> loadClass(String name, ClassLoader loader) throws ClassNotFoundException {
+ private static Class<?> loadClass(String name, ClassLoader loader) throws ClassNotFoundException {
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
if (contextClassLoader != null) {
try {
diff --git a/activemq-client/pom.xml b/activemq-client/pom.xml
index 0a6f8e1..f5e2bae 100644
--- a/activemq-client/pom.xml
+++ b/activemq-client/pom.xml
@@ -32,6 +32,15 @@
<properties>
<surefire.argLine>-Xmx512M</surefire.argLine>
+ <activemq.osgi.import.pkg>
+ !com.google.errorprone.annotations,
+ !com.google.errorprone.annotations.concurrent,
+ *
+ </activemq.osgi.import.pkg>
+ <activemq.osgi.private.pkg>
+ com.google.errorprone.annotations,
+ com.google.errorprone.annotations.concurrent
+ </activemq.osgi.private.pkg>
</properties>
<dependencies>
diff --git a/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java b/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
index 8c38c2d..40d2f1a 100644
--- a/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/usage/MemoryUsage.java
@@ -94,7 +94,7 @@
* @return true if space
*/
@Override
- public boolean waitForSpace(long timeout) throws InterruptedException {
+ public boolean waitForSpace(final long timeout) throws InterruptedException {
if (parent != null) {
if (!parent.waitForSpace(timeout)) {
return false;
@@ -106,12 +106,15 @@
usageLock.readLock().unlock();
usageLock.writeLock().lock();
try {
- while (percentUsage >= 100 ) {
- waitForSpaceCondition.await(timeout, TimeUnit.MILLISECONDS);
+ final long deadline = timeout > 0 ? System.currentTimeMillis() + timeout : Long.MAX_VALUE;
+ long timeleft = deadline;
+ while (percentUsage >= 100 && timeleft > 0) {
+ waitForSpaceCondition.await(Math.min(getPollingTime(), timeleft), TimeUnit.MILLISECONDS);
+ timeleft = deadline - System.currentTimeMillis();
}
- usageLock.readLock().lock();
} finally {
usageLock.writeLock().unlock();
+ usageLock.readLock().lock();
}
}
diff --git a/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java b/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
index b869939..24e47c2 100644
--- a/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
+++ b/activemq-client/src/test/java/org/apache/activemq/usage/MemoryUsageTest.java
@@ -17,9 +17,6 @@
package org.apache.activemq.usage;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@@ -29,6 +26,11 @@
import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
public class MemoryUsageTest {
MemoryUsage underTest;
@@ -83,6 +85,15 @@
assertEquals("limits are still matched whole", underTest.getLimit(), child.getLimit());
}
+ @Test(timeout=2000)
+ public void testLimitedWaitFail() throws Exception {
+ underTest.setLimit(10);
+ underTest.start();
+ underTest.increaseUsage(11);
+
+ assertFalse("did not get usage within limit", underTest.waitForSpace(500));
+ }
+
@Before
public void setUp() throws Exception {
underTest = new MemoryUsage();
diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/CreateCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/CreateCommand.java
index f9f0fb0..3e246ce 100644
--- a/activemq-console/src/main/java/org/apache/activemq/console/command/CreateCommand.java
+++ b/activemq-console/src/main/java/org/apache/activemq/console/command/CreateCommand.java
@@ -209,6 +209,8 @@
// utlity method to write an xml source to file
private void writeToFile(Source src, File file) throws TransformerException {
TransformerFactory tFactory = TransformerFactory.newInstance();
+ tFactory.setFeature(javax.xml.XMLConstants.FEATURE_SECURE_PROCESSING, Boolean.TRUE);
+
Transformer fileTransformer = tFactory.newTransformer();
Result res = new StreamResult(file);
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java b/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java
index 897a279..15f4de6 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/discovery/http/HTTPDiscoveryAgent.java
@@ -38,7 +38,7 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.impl.client.BasicResponseHandler;
-import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +53,7 @@
private static final Logger LOG = LoggerFactory.getLogger(HTTPDiscoveryAgent.class);
private String registryURL = "http://localhost:8080/discovery-registry/default";
- private HttpClient httpClient = new DefaultHttpClient();
+ private HttpClient httpClient = HttpClientBuilder.create().build();
private AtomicBoolean running = new AtomicBoolean();
private final AtomicReference<DiscoveryListener> discoveryListener = new AtomicReference<DiscoveryListener>();
private final HashSet<String> registeredServices = new HashSet<String>();
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
index 2480daa..6582bb8 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
@@ -40,26 +40,23 @@
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.HttpClient;
import org.apache.http.client.HttpResponseException;
import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.CookieSpecs;
+import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpOptions;
import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.params.CookiePolicy;
-import org.apache.http.client.params.HttpClientParams;
-import org.apache.http.conn.ClientConnectionManager;
-import org.apache.http.conn.params.ConnRoutePNames;
-import org.apache.http.conn.scheme.PlainSocketFactory;
-import org.apache.http.conn.scheme.Scheme;
+import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.BasicResponseHandler;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.impl.conn.PoolingClientConnectionManager;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.AbstractHttpMessage;
-import org.apache.http.params.HttpConnectionParams;
-import org.apache.http.params.HttpParams;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
@@ -72,7 +69,7 @@
*/
public class HttpClientTransport extends HttpTransportSupport {
- public static final int MAX_CLIENT_TIMEOUT = 30000;
+ public static final int MAX_CLIENT_TIMEOUT = 90000;
private static final Logger LOG = LoggerFactory.getLogger(HttpClientTransport.class);
private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator();
@@ -317,9 +314,10 @@
}
protected HttpClient createHttpClient() {
- DefaultHttpClient client = new DefaultHttpClient(createClientConnectionManager());
+ HttpClientBuilder clientBuilder = HttpClientBuilder.create();
+ clientBuilder.setConnectionManager(createClientConnectionManager());
if (useCompression) {
- client.addRequestInterceptor( new HttpRequestInterceptor() {
+ clientBuilder.addInterceptorLast(new HttpRequestInterceptor() {
@Override
public void process(HttpRequest request, HttpContext context) {
// We expect to received a compression response that we un-gzip
@@ -327,31 +325,30 @@
}
});
}
+
+ RequestConfig.Builder requestConfigBuilder = RequestConfig.custom();
if (getProxyHost() != null) {
HttpHost proxy = new HttpHost(getProxyHost(), getProxyPort());
- client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
+ requestConfigBuilder.setProxy(proxy);
- if (client.getConnectionManager().getSchemeRegistry().get("http") == null) {
- client.getConnectionManager().getSchemeRegistry().register(
- new Scheme("http", getProxyPort(), PlainSocketFactory.getSocketFactory()));
- }
-
- if(getProxyUser() != null && getProxyPassword() != null) {
- client.getCredentialsProvider().setCredentials(
+ if (getProxyUser() != null && getProxyPassword() != null) {
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
new AuthScope(getProxyHost(), getProxyPort()),
new UsernamePasswordCredentials(getProxyUser(), getProxyPassword()));
+ clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
}
- HttpParams params = client.getParams();
- HttpConnectionParams.setSoTimeout(params, soTimeout);
- HttpClientParams.setCookiePolicy(params, CookiePolicy.BROWSER_COMPATIBILITY);
+ requestConfigBuilder.setSocketTimeout(soTimeout);
+ requestConfigBuilder.setCookieSpec(CookieSpecs.DEFAULT);
+ clientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
- return client;
+ return clientBuilder.build();
}
- protected ClientConnectionManager createClientConnectionManager() {
- return new PoolingClientConnectionManager();
+ protected HttpClientConnectionManager createClientConnectionManager() {
+ return new PoolingHttpClientConnectionManager();
}
protected void configureMethod(AbstractHttpMessage method) {
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java
index 2e432fc..9f6774a 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/https/HttpsClientTransport.java
@@ -24,11 +24,13 @@
import org.apache.activemq.transport.http.HttpClientTransport;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.http.conn.ClientConnectionManager;
-import org.apache.http.conn.scheme.Scheme;
-import org.apache.http.conn.scheme.SchemeRegistry;
-import org.apache.http.conn.ssl.SSLSocketFactory;
-import org.apache.http.impl.conn.PoolingClientConnectionManager;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
public class HttpsClientTransport extends HttpClientTransport {
@@ -37,19 +39,17 @@
}
@Override
- protected ClientConnectionManager createClientConnectionManager() {
- PoolingClientConnectionManager connectionManager = new PoolingClientConnectionManager(createSchemeRegistry());
- return connectionManager;
+ protected HttpClientConnectionManager createClientConnectionManager() {
+ return new PoolingHttpClientConnectionManager(createRegistry());
}
- private SchemeRegistry createSchemeRegistry() {
+ private Registry<ConnectionSocketFactory> createRegistry() {
- SchemeRegistry schemeRegistry = new SchemeRegistry();
+ RegistryBuilder<ConnectionSocketFactory> registryBuilder = RegistryBuilder.<ConnectionSocketFactory>create();
try {
- SSLSocketFactory sslSocketFactory = new SSLSocketFactory(createSocketFactory(),
- SSLSocketFactory.BROWSER_COMPATIBLE_HOSTNAME_VERIFIER);
- schemeRegistry.register(new Scheme("https", getRemoteUrl().getPort(), sslSocketFactory));
- return schemeRegistry;
+ SSLConnectionSocketFactory sslConnectionFactory = new SSLConnectionSocketFactory(createSocketFactory(), new DefaultHostnameVerifier());
+ registryBuilder.register("https", sslConnectionFactory);
+ return registryBuilder.build();
} catch (Exception e) {
throw new IllegalStateException("Failure trying to create scheme registry", e);
}
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpClientTransportCookiePolicyTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpClientTransportCookiePolicyTest.java
deleted file mode 100644
index b0f5691..0000000
--- a/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpClientTransportCookiePolicyTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.http;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.activemq.transport.util.TextWireFormat;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.params.HttpClientParams;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test that {@link HttpClientTransport} sets a broad-range compatibility
- * cookie policy.
- *
- * @see <a href="https://issues.apache.org/jira/browse/AMQ-6571">AMQ-6571: HttpClientTransport refuses to accept cookies using `Expires' header</a>
- */
-@SuppressWarnings("deprecation")
-public class HttpClientTransportCookiePolicyTest {
-
- private HttpClientTransport transport;
-
-
- /**
- * Create the transport so we can inspect it.
- * @throws URISyntaxException if something goes wrong.
- */
- @Before
- public void setUp() throws URISyntaxException {
- transport = new HttpClientTransport(mock(TextWireFormat.class), new URI("http://localhost:8080/test"));
- }
-
-
- /**
- * Create a new connection and check the connection properties.
- */
- @Test
- public void test() {
- HttpClient client = transport.createHttpClient();
- assertEquals("Cookie spec", org.apache.http.client.params.CookiePolicy.BROWSER_COMPATIBILITY, HttpClientParams.getCookiePolicy(client.getParams()));
- }
-}
diff --git a/activemq-jaas/pom.xml b/activemq-jaas/pom.xml
index f52ee08..4dcfeb7 100644
--- a/activemq-jaas/pom.xml
+++ b/activemq-jaas/pom.xml
@@ -69,22 +69,15 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.directory.server</groupId>
<artifactId>apacheds-core-integ</artifactId>
<version>${directory-version}</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>bouncycastle</groupId>
- <artifactId>bcprov-jdk15</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-jdk15</artifactId>
- <version>1.46</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.directory.server</groupId>
diff --git a/activemq-jaas/src/main/java/org/apache/activemq/jaas/ReloadableProperties.java b/activemq-jaas/src/main/java/org/apache/activemq/jaas/ReloadableProperties.java
index 95781cc..42427d0 100644
--- a/activemq-jaas/src/main/java/org/apache/activemq/jaas/ReloadableProperties.java
+++ b/activemq-jaas/src/main/java/org/apache/activemq/jaas/ReloadableProperties.java
@@ -24,6 +24,8 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +35,7 @@
private Properties props = new Properties();
private Map<String, String> invertedProps;
private Map<String, Set<String>> invertedValueProps;
+ private Map<String, Pattern> regexpProps;
private long reloadTime = -1;
private final PropertiesLoader.FileNameKey key;
@@ -51,6 +54,7 @@
load(key.file(), props);
invertedProps = null;
invertedValueProps = null;
+ regexpProps = null;
if (key.isDebug()) {
LOG.debug("Load of: " + key);
}
@@ -69,7 +73,10 @@
if (invertedProps == null) {
invertedProps = new HashMap<>(props.size());
for (Map.Entry<Object, Object> val : props.entrySet()) {
- invertedProps.put((String) val.getValue(), (String) val.getKey());
+ String str = (String) val.getValue();
+ if (!looksLikeRegexp(str)) {
+ invertedProps.put(str, (String) val.getKey());
+ }
}
}
return invertedProps;
@@ -93,6 +100,24 @@
return invertedValueProps;
}
+ public synchronized Map<String, Pattern> regexpPropertiesMap() {
+ if (regexpProps == null) {
+ regexpProps = new HashMap<>(props.size());
+ for (Map.Entry<Object, Object> val : props.entrySet()) {
+ String str = (String) val.getValue();
+ if (looksLikeRegexp(str)) {
+ try {
+ Pattern p = Pattern.compile(str.substring(1, str.length() - 1));
+ regexpProps.put((String) val.getKey(), p);
+ } catch (PatternSyntaxException e) {
+ LOG.warn("Ignoring invalid regexp: " + str);
+ }
+ }
+ }
+ }
+ return regexpProps;
+ }
+
private void load(final File source, Properties props) throws IOException {
FileInputStream in = new FileInputStream(source);
try {
@@ -116,4 +141,9 @@
return key.file.lastModified() > reloadTime;
}
+ private boolean looksLikeRegexp(String str) {
+ int len = str.length();
+ return len > 2 && str.charAt(0) == '/' && str.charAt(len - 1) == '/';
+ }
+
}
diff --git a/activemq-jaas/src/main/java/org/apache/activemq/jaas/TextFileCertificateLoginModule.java b/activemq-jaas/src/main/java/org/apache/activemq/jaas/TextFileCertificateLoginModule.java
index 42f2c9d..c316367 100644
--- a/activemq-jaas/src/main/java/org/apache/activemq/jaas/TextFileCertificateLoginModule.java
+++ b/activemq-jaas/src/main/java/org/apache/activemq/jaas/TextFileCertificateLoginModule.java
@@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.regex.Pattern;
import javax.security.auth.Subject;
import javax.security.auth.callback.CallbackHandler;
@@ -32,13 +33,14 @@
/**
* A LoginModule allowing for SSL certificate based authentication based on
* Distinguished Names (DN) stored in text files. The DNs are parsed using a
- * Properties class where each line is <user_name>=<user_DN>. This class also
- * uses a group definition file where each line is <group_name>=<user_name_1>,<user_name_2>,etc.
+ * Properties class where each line is either <UserName>=<StringifiedSubjectDN>
+ * or <UserName>=/<SubjectDNRegExp>/. This class also uses a group definition
+ * file where each line is <GroupName>=<UserName1>,<UserName2>,etc.
* The user and group files' locations must be specified in the
* org.apache.activemq.jaas.textfiledn.user and
- * org.apache.activemq.jaas.textfiledn.user properties respectively. NOTE: This
- * class will re-read user and group files for every authentication (i.e it does
- * live updates of allowed groups and users).
+ * org.apache.activemq.jaas.textfiledn.group properties respectively.
+ * NOTE: This class will re-read user and group files for every authentication
+ * (i.e it does live updates of allowed groups and users).
*
* @author sepandm@gmail.com (Sepand)
*/
@@ -48,6 +50,7 @@
private static final String GROUP_FILE_PROP_NAME = "org.apache.activemq.jaas.textfiledn.group";
private Map<String, Set<String>> groupsByUser;
+ private Map<String, Pattern> regexpByUser;
private Map<String, String> usersByDn;
/**
@@ -58,6 +61,7 @@
super.initialize(subject, callbackHandler, sharedState, options);
usersByDn = load(USER_FILE_PROP_NAME, "", options).invertedPropertiesMap();
+ regexpByUser = load(USER_FILE_PROP_NAME, "", options).regexpPropertiesMap();
groupsByUser = load(GROUP_FILE_PROP_NAME, "", options).invertedPropertiesValuesMap();
}
@@ -76,8 +80,8 @@
if (certs == null) {
throw new LoginException("Client certificates not found. Cannot authenticate.");
}
-
- return usersByDn.get(getDistinguishedName(certs));
+ String dn = getDistinguishedName(certs);
+ return usersByDn.containsKey(dn) ? usersByDn.get(dn) : getUserByRegexp(dn);
}
/**
@@ -96,4 +100,17 @@
}
return userGroups;
}
+
+ private synchronized String getUserByRegexp(String dn) {
+ String name = null;
+ for (Map.Entry<String, Pattern> val : regexpByUser.entrySet()) {
+ if (val.getValue().matcher(dn).matches()) {
+ name = val.getKey();
+ break;
+ }
+ }
+ usersByDn.put(dn, name);
+ return name;
+ }
+
}
diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
index 111e730..00911ec 100644
--- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
+++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java
@@ -81,8 +81,8 @@
@Override
public void close() throws JMSException {
- this.cleanupConnectionTemporaryDestinations();
this.cleanupAllLoanedSessions();
+ this.cleanupConnectionTemporaryDestinations();
if (this.pool != null) {
this.pool.decrementReferenceCount();
this.pool = null;
diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
index 86b4e27..d879580 100644
--- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
+++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionFactoryWithTemporaryDestinationsTest.java
@@ -19,6 +19,7 @@
import static org.junit.Assert.assertEquals;
import javax.jms.Connection;
+import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
@@ -119,6 +120,24 @@
assertEquals(0, countBrokerTemporaryTopics());
}
+ @Test(timeout = 60000)
+ public void testTemporaryQueueLeakAfterConnectionCloseWithConsumer() throws Exception {
+ Connection pooledConnection = null;
+ Session session = null;
+ Queue tempQueue = null;
+ for (int i = 0; i < 2; i++) {
+ pooledConnection = pooledFactory.createConnection();
+ session = pooledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ tempQueue = session.createTemporaryQueue();
+ MessageConsumer consumer = session.createConsumer(tempQueue);
+ consumer.receiveNoWait();
+ LOG.info("Created queue named: " + tempQueue.getQueueName());
+ pooledConnection.close();
+ }
+
+ assertEquals(0, countBrokerTemporaryQueues());
+ }
+
private int countBrokerTemporaryQueues() throws Exception {
return ((RegionBroker) brokerService.getRegionBroker()).getTempQueueRegion().getDestinationMap().size();
}
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 9660afc..1a120a2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1397,6 +1397,8 @@
if (before != null) {
before.sequenceAssignedWithIndexLocked(-1);
}
+ // Moving the checkpoint pointer as there is no persistent operations in this transaction to be replayed
+ processLocation(location);
return;
}
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index f143a07..fcf6ba2 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -56,7 +56,6 @@
import org.apache.activemq.store.TransactionIdTransformerAware;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
-import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
@@ -554,6 +553,15 @@
return transactionStore.getJournalMaxWriteBatchSize();
}
+
+ public void setJournalCleanupInterval(long journalCleanupInterval) {
+ transactionStore.setJournalCleanupInterval(journalCleanupInterval);
+ }
+
+ public long getJournalCleanupInterval() {
+ return transactionStore.getJournalCleanupInterval();
+ }
+
public List<PersistenceAdapter> getAdapters() {
return Collections.unmodifiableList(adapters);
}
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index f13fc53..55931a6 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -63,11 +63,13 @@
static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class);
final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter;
final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>();
- final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>();
+ final ConcurrentMap<TransactionId, Tx> pendingCommit = new ConcurrentHashMap<TransactionId, Tx>();
private Journal journal;
private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
private final AtomicBoolean started = new AtomicBoolean(false);
+ private final AtomicBoolean recovered = new AtomicBoolean(false);
+ private long journalCleanupInterval = Journal.DEFAULT_CLEANUP_INTERVAL;
public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) {
this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter;
@@ -190,6 +192,14 @@
this.journalWriteBatchSize = journalWriteBatchSize;
}
+ public void setJournalCleanupInterval(long journalCleanupInterval) {
+ this.journalCleanupInterval = journalCleanupInterval;
+ }
+
+ public long getJournalCleanupInterval() {
+ return journalCleanupInterval;
+ }
+
public class Tx {
private final HashMap<TransactionStore, TransactionId> stores = new HashMap<TransactionStore, TransactionId>();
private int prepareLocationId = 0;
@@ -284,10 +294,12 @@
public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))));
+ pendingCommit.put(txid, tx);
}
public void persistCompletion(TransactionId txid) throws IOException {
store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
+ pendingCommit.remove(txid);
}
private Location store(JournalCommand<?> data) throws IOException {
@@ -328,18 +340,26 @@
journal.setDirectory(getDirectory());
journal.setMaxFileLength(journalMaxFileLength);
journal.setWriteBatchSize(journalWriteBatchSize);
+ journal.setCleanupInterval(journalCleanupInterval);
IOHelper.mkdirs(journal.getDirectory());
journal.start();
recoverPendingLocalTransactions();
+ recovered.set(true);
store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
}
}
private void txStoreCleanup() {
+ if (!recovered.get()) {
+ return;
+ }
Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet());
for (Tx tx : inflightTransactions.values()) {
knownDataFileIds.remove(tx.getPreparedLocationId());
}
+ for (Tx tx : pendingCommit.values()) {
+ knownDataFileIds.remove(tx.getPreparedLocationId());
+ }
try {
journal.removeDataFiles(knownDataFileIds);
} catch (Exception e) {
@@ -362,11 +382,11 @@
private void recoverPendingLocalTransactions() throws IOException {
Location location = journal.getNextLocation(null);
while (location != null) {
- process(load(location));
+ process(location, load(location));
location = journal.getNextLocation(location);
}
- recoveredPendingCommit.addAll(inflightTransactions.keySet());
- LOG.info("pending local transactions: " + recoveredPendingCommit);
+ pendingCommit.putAll(inflightTransactions);
+ LOG.info("pending local transactions: " + pendingCommit.keySet());
}
public JournalCommand<?> load(Location location) throws IOException {
@@ -381,11 +401,11 @@
return message;
}
- public void process(JournalCommand<?> command) throws IOException {
+ public void process(final Location location, JournalCommand<?> command) throws IOException {
switch (command.type()) {
case KAHA_PREPARE_COMMAND:
KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command;
- getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo()));
+ getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())).trackPrepareLocation(location);
break;
case KAHA_COMMIT_COMMAND:
KahaCommitCommand commitCommand = (KahaCommitCommand) command;
@@ -422,10 +442,9 @@
for (TransactionId txid : broker.getPreparedTransactions(null)) {
if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) {
try {
- if (recoveredPendingCommit.contains(txid)) {
+ if (pendingCommit.keySet().contains(txid)) {
LOG.info("delivering pending commit outcome for tid: " + txid);
broker.commitTransaction(null, txid, false);
-
} else {
LOG.info("delivering rollback outcome to store for tid: " + txid);
broker.forgetTransaction(null, txid);
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 67d4c86..9a6e256 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -124,6 +124,14 @@
}
}
+ public void setCleanupInterval(long cleanupInterval) {
+ this.cleanupInterval = cleanupInterval;
+ }
+
+ public long getCleanupInterval() {
+ return cleanupInterval;
+ }
+
public enum PreallocationStrategy {
SPARSE_FILE,
OS_KERNEL_COPY,
@@ -230,6 +238,7 @@
protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
private File osKernelCopyTemplateFile = null;
private ByteBuffer preAllocateDirectBuffer = null;
+ private long cleanupInterval = DEFAULT_CLEANUP_INTERVAL;
protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
@@ -345,7 +354,7 @@
public void run() {
cleanup();
}
- }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS);
+ }, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);
long end = System.currentTimeMillis();
LOG.trace("Startup took: "+(end-start)+" ms");
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java
index 52b2f99..c91020c 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/Transaction.java
@@ -22,6 +22,8 @@
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayInputStream;
import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.Iterator;
@@ -34,6 +36,8 @@
*/
public class Transaction implements Iterable<Page> {
+ private static final Logger LOG = LoggerFactory.getLogger(Transaction.class);
+
private RandomAccessFile tmpFile;
private File txFile;
private long nextLocation = 0;
@@ -656,6 +660,7 @@
public void commit() throws IOException {
if( writeTransactionId!=-1 ) {
if (tmpFile != null) {
+ LOG.debug("Committing transaction {}: Size {} kb", writeTransactionId, tmpFile.length() / (1024));
pageFile.removeTmpFile(getTempFile(), tmpFile);
tmpFile = null;
txFile = null;
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
index 4a0fbc4..5889c6a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java
@@ -20,6 +20,7 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -578,6 +579,9 @@
}
}
+ List<Integer> removedJobFileIds = new ArrayList<>();
+ HashMap<Integer, Integer> decrementJournalCount = new HashMap<>();
+
for (Long executionTime : keys) {
List<JobLocation> values = this.index.remove(tx, executionTime);
if (location != null) {
@@ -586,9 +590,9 @@
// Remove the references for add and reschedule commands for this job
// so that those logs can be GC'd when free.
- this.store.decrementJournalCount(tx, job.getLocation());
+ decrementJournalCount.compute(job.getLocation().getDataFileId(), (key, value) -> value == null ? 1 : value + 1);
if (job.getLastUpdate() != null) {
- this.store.decrementJournalCount(tx, job.getLastUpdate());
+ decrementJournalCount.compute(job.getLastUpdate().getDataFileId(), (key, value) -> value == null ? 1 : value + 1);
}
// now that the job is removed from the index we can store the remove info and
@@ -597,11 +601,19 @@
// the same file we don't need to track it and just let a normal GC of the logs
// remove it when the log is unreferenced.
if (job.getLocation().getDataFileId() != location.getDataFileId()) {
- this.store.referenceRemovedLocation(tx, location, job);
+ removedJobFileIds.add(job.getLocation().getDataFileId());
}
}
}
}
+
+ if (!removedJobFileIds.isEmpty()) {
+ this.store.referenceRemovedLocation(tx, location, removedJobFileIds);
+ }
+
+ if (decrementJournalCount.size() > 0) {
+ this.store.decrementJournalCount(tx, decrementJournalCount);
+ }
}
/**
@@ -657,22 +669,35 @@
* @param tx
* the transaction under which this operation was invoked.
*
- * @return a list of all referenced Location values for this JobSchedulerImpl
+ * @return a iterator of all referenced Location values for this JobSchedulerImpl
*
* @throws IOException if an error occurs walking the scheduler tree.
*/
- protected List<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
- List<JobLocation> references = new ArrayList<>();
+ protected Iterator<JobLocation> getAllScheduledJobs(Transaction tx) throws IOException {
+ return new Iterator<JobLocation>() {
- for (Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx); i.hasNext();) {
- Map.Entry<Long, List<JobLocation>> entry = i.next();
- List<JobLocation> scheduled = entry.getValue();
- for (JobLocation job : scheduled) {
- references.add(job);
+ final Iterator<Map.Entry<Long, List<JobLocation>>> mapIterator = index.iterator(tx);
+ Iterator<JobLocation> iterator;
+
+ @Override
+ public boolean hasNext() {
+
+ while (iterator == null || !iterator.hasNext()) {
+ if (!mapIterator.hasNext()) {
+ break;
+ }
+
+ iterator = new ArrayList<>(mapIterator.next().getValue()).iterator();
+ }
+
+ return iterator != null && iterator.hasNext();
}
- }
- return references;
+ @Override
+ public JobLocation next() {
+ return iterator.next();
+ }
+ };
}
@Override
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
index 79059f1..7720159 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java
@@ -322,7 +322,7 @@
@Override
public boolean accept(File dir, String name) {
- if (name.endsWith(".data") || name.endsWith(".redo") || name.endsWith(".log")) {
+ if (name.endsWith(".data") || name.endsWith(".redo") || name.endsWith(".log") || name.endsWith(".free")) {
return true;
}
return false;
@@ -493,6 +493,37 @@
}
/**
+ * Removes multiple references for the Journal log file indicated in the given Location map.
+ *
+ * The references are used to track which log files cannot be GC'd. When the reference count
+ * on a log file reaches zero the file id is removed from the tracker and the log will be
+ * removed on the next check point update.
+ *
+ * @param tx
+ * The TX under which the update is to be performed.
+ * @param decrementsByFileIds
+ * Map indicating how many decrements per fileId.
+ *
+ * @throws IOException if an error occurs while updating the journal references table.
+ */
+ protected void decrementJournalCount(Transaction tx, HashMap<Integer, Integer> decrementsByFileIds) throws IOException {
+ for(Map.Entry<Integer, Integer> entry : decrementsByFileIds.entrySet()) {
+ int logId = entry.getKey();
+ Integer refCount = metaData.getJournalRC().get(tx, logId);
+
+ if (refCount != null) {
+ int refCountValue = refCount;
+ refCountValue -= entry.getValue();
+ if (refCountValue <= 0) {
+ metaData.getJournalRC().remove(tx, logId);
+ } else {
+ metaData.getJournalRC().put(tx, logId, refCountValue);
+ }
+ }
+ }
+ }
+
+ /**
* Updates the Job removal tracking index with the location of a remove command and the
* original JobLocation entry.
*
@@ -520,6 +551,33 @@
}
/**
+ * Updates the Job removal tracking index with the location of a remove command and the
+ * original JobLocation entry.
+ *
+ * The JobLocation holds the locations in the logs where the add and update commands for
+ * a job stored. The log file containing the remove command can only be discarded after
+ * both the add and latest update log files have also been discarded.
+ *
+ * @param tx
+ * The TX under which the update is to be performed.
+ * @param location
+ * The location value to reference a remove command.
+ * @param removedJobsFileId
+ * List of the original JobLocation instances that holds the add and update locations
+ *
+ * @throws IOException if an error occurs while updating the remove location tracker.
+ */
+ protected void referenceRemovedLocation(Transaction tx, Location location, List<Integer> removedJobsFileId) throws IOException {
+ int logId = location.getDataFileId();
+ List<Integer> removed = this.metaData.getRemoveLocationTracker().get(tx, logId);
+ if (removed == null) {
+ removed = new ArrayList<Integer>();
+ }
+ removed.addAll(removedJobsFileId);
+ this.metaData.getRemoveLocationTracker().put(tx, logId, removed);
+ }
+
+ /**
* Retrieve the scheduled Job's byte blob from the journal.
*
* @param location
@@ -826,8 +884,8 @@
Map.Entry<String, JobSchedulerImpl> entry = i.next();
JobSchedulerImpl scheduler = entry.getValue();
- List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
- for (JobLocation job : jobs) {
+ for (Iterator<JobLocation> jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) {
+ final JobLocation job = jobLocationIterator.next();
if (job.getLocation().compareTo(lastAppendLocation) >= 0) {
if (scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime())) {
LOG.trace("Removed Job past last appened in the journal: {}", job.getJobId());
@@ -854,8 +912,8 @@
Map.Entry<String, JobSchedulerImpl> entry = i.next();
JobSchedulerImpl scheduler = entry.getValue();
- List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
- for (JobLocation job : jobs) {
+ for (Iterator<JobLocation> jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) {
+ final JobLocation job = jobLocationIterator.next();
missingJournalFiles.add(job.getLocation().getDataFileId());
if (job.getLastUpdate() != null) {
missingJournalFiles.add(job.getLastUpdate().getDataFileId());
@@ -937,8 +995,8 @@
Map.Entry<String, JobSchedulerImpl> entry = i.next();
JobSchedulerImpl scheduler = entry.getValue();
- List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
- for (JobLocation job : jobs) {
+ for (Iterator<JobLocation> jobLocationIterator = scheduler.getAllScheduledJobs(tx); jobLocationIterator.hasNext();) {
+ final JobLocation job = jobLocationIterator.next();
// Remove all jobs in missing log files.
if (missing.contains(job.getLocation().getDataFileId())) {
diff --git a/activemq-karaf/pom.xml b/activemq-karaf/pom.xml
index 3859da9..60d64ee 100644
--- a/activemq-karaf/pom.xml
+++ b/activemq-karaf/pom.xml
@@ -33,9 +33,8 @@
<properties>
<xpp3-bundle-version>1.1.4c_5</xpp3-bundle-version>
<jodatime-bundle-version>2.9</jodatime-bundle-version>
- <jdom-bundle-version>1.1_4</jdom-bundle-version>
<dom4j-bundle-version>1.6.1_2</dom4j-bundle-version>
- <xstream-bundle-version>1.4.8_1</xstream-bundle-version>
+ <xstream-bundle-version>1.4.11.1_1</xstream-bundle-version>
<servicemix.specs.version>2.4.0</servicemix.specs.version>
</properties>
@@ -154,7 +153,7 @@
</artifact>
<artifact>
<file>target/classes/activemq.xml</file>
- <type>xml</type>
+ <type>xml</type>>
<classifier>activemq</classifier>
</artifact>
<artifact>
@@ -180,12 +179,17 @@
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
<Export-Package>org.apache.activemq.karaf*;version=${project.version};-split-package:=merge-first</Export-Package>
<Import-Package>
+ !com.google.errorprone.annotations,
+ !com.google.errorprone.annotations.concurrent,
org.apache.felix.gogo.commands,
org.apache.karaf.shell.console;version="[2,4)",
org.apache.karaf.shell.console.commands;version="[2,4)",
*
</Import-Package>
- <Private-Package>!*</Private-Package>
+ <Private-Package>
+ com.google.errorprone.annotations,
+ com.google.errorprone.annotations.concurrent
+ </Private-Package>
<_versionpolicy>[$(version;==;$(@)),$(version;+;$(@)))</_versionpolicy>
<Embed-Transitive>true</Embed-Transitive>
</instructions>
diff --git a/activemq-karaf/src/main/resources/features.xml b/activemq-karaf/src/main/resources/features.xml
index c989705..9c514df 100644
--- a/activemq-karaf/src/main/resources/features.xml
+++ b/activemq-karaf/src/main/resources/features.xml
@@ -25,15 +25,15 @@
<!-- Starts the broker with default configuration -->
<feature name="activemq-broker-noweb" description="Full ActiveMQ broker with default configuration" version="${project.version}" resolver="(obr)" start-level="50">
<feature version="${project.version}">activemq</feature>
- <configfile finalname="/etc/activemq.xml">mvn:org.apache.activemq/activemq-karaf/${project.version}/xml/activemq</configfile>
- <configfile finalname="/etc/org.apache.activemq.server-default.cfg">mvn:org.apache.activemq/activemq-karaf/${project.version}/cfg/activemq</configfile>
+ <configfile finalname="${karaf.etc}/activemq.xml">mvn:org.apache.activemq/activemq-karaf/${project.version}/xml/activemq</configfile>
+ <configfile finalname="${karaf.etc}/org.apache.activemq.server-default.cfg">mvn:org.apache.activemq/activemq-karaf/${project.version}/cfg/activemq</configfile>
</feature>
<!-- Starts the broker with default configuration and web console -->
<feature name="activemq-broker" description="Full ActiveMQ broker with default configuration and web console" version="${project.version}" resolver="(obr)" start-level="50">
<feature version="${project.version}">activemq</feature>
- <configfile finalname="/etc/activemq.xml">mvn:org.apache.activemq/activemq-karaf/${project.version}/xml/activemq</configfile>
- <configfile finalname="/etc/org.apache.activemq.server-default.cfg">mvn:org.apache.activemq/activemq-karaf/${project.version}/cfg/activemq</configfile>
+ <configfile finalname="${karaf.etc}/activemq.xml">mvn:org.apache.activemq/activemq-karaf/${project.version}/xml/activemq</configfile>
+ <configfile finalname="${karaf.etc}/org.apache.activemq.server-default.cfg">mvn:org.apache.activemq/activemq-karaf/${project.version}/cfg/activemq</configfile>
<feature version="${project.version}">activemq-web-console</feature>
</feature>
@@ -49,7 +49,7 @@
<feature name="activemq-web-console" version="${project.version}" resolver="(obr)" start-level="50">
<feature start-level="10">war</feature>
<feature start-level="10">eventadmin</feature>
- <configfile finalname="/etc/org.apache.activemq.webconsole.cfg">mvn:org.apache.activemq/activemq-karaf/${project.version}/cfg/activemq-webconsole</configfile>
+ <configfile finalname="${karaf.etc}/org.apache.activemq.webconsole.cfg">mvn:org.apache.activemq/activemq-karaf/${project.version}/cfg/activemq-webconsole</configfile>
<bundle>mvn:org.apache.activemq/activemq-web-console/${project.version}/war</bundle>
</feature>
diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml
index 15fb1b1..d8ac05c 100644
--- a/activemq-mqtt/pom.xml
+++ b/activemq-mqtt/pom.xml
@@ -320,7 +320,7 @@
<!-- for the paho dependency -->
<repository>
<id>spring.io</id>
- <url>http://repo.spring.io/plugins-release</url>
+ <url>https://repo.spring.io/plugins-release</url>
<releases><enabled>true</enabled></releases>
<snapshots><enabled>false</enabled></snapshots>
</repository>
diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml
index 9a406f0..9f8bd8a 100644
--- a/activemq-osgi/pom.xml
+++ b/activemq-osgi/pom.xml
@@ -35,6 +35,10 @@
!org.apache.commons.daemon,
!org.apache.maven*,
!com.google.thirdparty.publicsuffix,
+ !com.rometools*,
+ !com.google.errorprone.annotations,
+ !com.google.errorprone.annotations.concurrent,
+ !com.google.j2objc.annotations,
sun.misc*;resolution:=optional,
sun.nio*;resolution:=optional,
javax.jmdns*;resolution:=optional,
@@ -62,7 +66,7 @@
org.springframework*;version="[4,5)";resolution:=optional,
org.xmlpull*;resolution:=optional,
scala*;resolution:=optional,
- javax.annotation*,
+ javax.annotation*;version="[1,4)",
!com.thoughtworks.qdox*,
org.apache.commons.logging;version="[1.2,2)";resolution:=optional,
javax.jms*;version="[1.1,3)",
@@ -92,6 +96,9 @@
org.fusesource.hawtbuf*,
org.apache.qpid*,
com.google.common*,
+ com.google.errorprone.annotations,
+ com.google.errorprone.annotations.concurrent,
+ com.google.j2objc.annotations,
org.linkedin*,
org.iq80*
</activemq.osgi.private.pkg>
@@ -163,7 +170,8 @@
<groupId>${project.groupId}</groupId>
<artifactId>activemq-web</artifactId>
<exclusions>
- <exclusion>
+ <exclusion>
+ <groupId>${project.groupId}</groupId>
<artifactId>activemq-all</artifactId>
</exclusion>
</exclusions>
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
index a89d5ee..b3deac4 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
@@ -22,7 +22,7 @@
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -116,8 +116,10 @@
private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<>();
private final StompTransport stompTransport;
- private final ConcurrentMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<>();
- private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
+ // Global Map shared with all subscriptions to allow finding the sub associated with an ACK Id
+ private final ConcurrentMap<String, StompAckEntry> pendingAcksTracker = new ConcurrentHashMap<>();
+ // Read-Only view used in this class to enforce the separation of read vs update of the global index.
+ private final Map<String, StompAckEntry> pendingAcks = Collections.unmodifiableMap(pendingAcksTracker);
private final Object commnadIdMutex = new Object();
private int lastCommandId;
@@ -131,34 +133,6 @@
private float hbGracePeriodMultiplier = 1.0f;
private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
- private static class AckEntry {
-
- private final String messageId;
- private final StompSubscription subscription;
-
- public AckEntry(String messageId, StompSubscription subscription) {
- this.messageId = messageId;
- this.subscription = subscription;
- }
-
- public MessageAck onMessageAck(TransactionId transactionId) {
- return subscription.onStompMessageAck(messageId, transactionId);
- }
-
- public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
- return subscription.onStompMessageNack(messageId, transactionId);
- }
-
- public String getMessageId() {
- return this.messageId;
- }
-
- @SuppressWarnings("unused")
- public StompSubscription getSubscription() {
- return this.subscription;
- }
- }
-
public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
this.stompTransport = stompTransport;
this.brokerContext = brokerContext;
@@ -297,7 +271,11 @@
// Let the stomp client know about any protocol errors.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
- exception.printStackTrace(stream);
+ if (exception instanceof SecurityException || exception.getCause() instanceof SecurityException) {
+ stream.write(exception.getLocalizedMessage());
+ } else {
+ exception.printStackTrace(stream);
+ }
stream.close();
HashMap<String, String> headers = new HashMap<>();
@@ -383,9 +361,9 @@
boolean nacked = false;
if (ackId != null) {
- AckEntry pendingAck = this.pedingAcks.remove(ackId);
+ StompAckEntry pendingAck = this.pendingAcks.get(ackId);
if (pendingAck != null) {
- messageId = pendingAck.getMessageId();
+ messageId = pendingAck.getMessageId().toString();
MessageAck ack = pendingAck.onMessageNack(activemqTx);
if (ack != null) {
sendToActiveMQ(ack, createResponseHandler(command));
@@ -439,9 +417,9 @@
boolean acked = false;
if (ackId != null) {
- AckEntry pendingAck = this.pedingAcks.remove(ackId);
+ StompAckEntry pendingAck = this.pendingAcks.get(ackId);
if (pendingAck != null) {
- messageId = pendingAck.getMessageId();
+ messageId = pendingAck.getMessageId().toString();
MessageAck ack = pendingAck.onMessageAck(activemqTx);
if (ack != null) {
sendToActiveMQ(ack, createResponseHandler(command));
@@ -522,8 +500,6 @@
sub.onStompCommit(activemqTx);
}
- pedingAcks.clear();
-
TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId);
tx.setTransactionId(activemqTx);
@@ -553,8 +529,6 @@
}
}
- pedingAcks.clear();
-
TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connectionId);
tx.setTransactionId(activemqTx);
@@ -620,9 +594,9 @@
StompSubscription stompSubscription;
if (!consumerInfo.isBrowser()) {
- stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
+ stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker);
} else {
- stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
+ stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION), pendingAcksTracker);
}
stompSubscription.setDestination(actualDest);
@@ -841,6 +815,7 @@
protected void onStompDisconnect(StompFrame command) throws ProtocolException {
if (connected.get()) {
+ LOG.trace("Connection closed with {} pending ACKs still being tracked.", pendingAcks.size());
sendToActiveMQ(connectionInfo.createRemoveCommand(), createResponseHandler(command));
sendToActiveMQ(new ShutdownInfo(), createResponseHandler(command));
connected.set(false);
@@ -876,19 +851,7 @@
MessageDispatch md = (MessageDispatch)command;
StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
if (sub != null) {
- String ackId = null;
- if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO && md.getMessage() != null) {
- AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub);
- ackId = this.ACK_ID_GENERATOR.generateId();
- this.pedingAcks.put(ackId, pendingAck);
- }
- try {
- sub.onMessageDispatch(md, ackId);
- } catch (Exception ex) {
- if (ackId != null) {
- this.pedingAcks.remove(ackId);
- }
- }
+ sub.onMessageDispatch(md);
}
} else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
stompTransport.sendToStomp(ping);
@@ -1048,26 +1011,15 @@
return result;
}
- /**
- * Remove all pending acknowledgement markers that are batched into the single
- * client acknowledge operation.
- *
- * @param subscription
- * The STOMP Subscription that has performed a client acknowledge.
- * @param msgIdsToRemove
- * List of message IDs that are bound to the subscription that has ack'd
- */
- protected void afterClientAck(StompSubscription subscription, ArrayList<String> msgIdsToRemove) {
- int count = 0;
+ boolean isStomp10() {
+ return version.equals(Stomp.V1_0);
+ }
- for (Map.Entry<String,AckEntry> entry : this.pedingAcks.entrySet()){
- AckEntry actEntry = entry.getValue();
- if (msgIdsToRemove.contains(actEntry.messageId)) {
- this.pedingAcks.remove(entry.getKey());
- count++;
- }
- }
+ boolean isStomp11() {
+ return version.equals(Stomp.V1_1);
+ }
- LOG.trace("Subscription:[{}] client acknowledged {} messages", subscription.getSubscriptionId(), count);
+ boolean isStomp12() {
+ return version.equals(Stomp.V1_2);
}
}
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompAckEntry.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompAckEntry.java
new file mode 100644
index 0000000..1edcb62
--- /dev/null
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompAckEntry.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+
+/**
+ * Tracker object for Messages that carry STOMP v1.2 ACK IDs
+ */
+public class StompAckEntry {
+
+ private final String ackId;
+ private final MessageId messageId;
+ private final StompSubscription subscription;
+ private final MessageDispatch dispatch;
+
+ public StompAckEntry(MessageDispatch dispatch, String ackId, StompSubscription subscription) {
+ this.messageId = dispatch.getMessage().getMessageId();
+ this.subscription = subscription;
+ this.ackId = ackId;
+ this.dispatch = dispatch;
+ }
+
+ public MessageAck onMessageAck(TransactionId transactionId) {
+ return subscription.onStompMessageAck(messageId.toString(), transactionId);
+ }
+
+ public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
+ return subscription.onStompMessageNack(messageId.toString(), transactionId);
+ }
+
+ public MessageId getMessageId() {
+ return this.messageId;
+ }
+
+ public MessageDispatch getMessageDispatch() {
+ return this.dispatch;
+ }
+
+ public String getAckId() {
+ return this.ackId;
+ }
+
+ public StompSubscription getSubscription() {
+ return this.subscription;
+ }
+
+ @Override
+ public String toString() {
+ return "AckEntry[ msgId:" + messageId + ", ackId:" + ackId + ", sub:" + subscription + " ]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((messageId == null) ? 0 : messageId.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ StompAckEntry other = (StompAckEntry) obj;
+ if (messageId == null) {
+ if (other.messageId != null) {
+ return false;
+ }
+ } else if (!messageId.equals(other.messageId)) {
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
index 9e267ac..1238572 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
@@ -17,6 +17,7 @@
package org.apache.activemq.transport.stomp;
import java.io.IOException;
+import java.util.Map;
import javax.jms.JMSException;
@@ -27,15 +28,14 @@
public class StompQueueBrowserSubscription extends StompSubscription {
- public StompQueueBrowserSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
- super(stompTransport, subscriptionId, consumerInfo, transformation);
+ public StompQueueBrowserSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation, Map<String, StompAckEntry> pendingAcks) {
+ super(stompTransport, subscriptionId, consumerInfo, transformation, pendingAcks);
}
@Override
- void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
-
+ void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
if (md.getMessage() != null) {
- super.onMessageDispatch(md, ackId);
+ super.onMessageDispatch(md);
} else {
StompFrame browseDone = new StompFrame(Stomp.Responses.MESSAGE);
browseDone.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, this.getSubscriptionId());
@@ -52,5 +52,4 @@
public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
throw new ProtocolException("Cannot Nack a message on a Queue Browser Subscription.");
}
-
}
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
index 0d8e308..95fe986 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
@@ -17,12 +17,10 @@
package org.apache.activemq.transport.stomp;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
-import java.util.Map.Entry;
import javax.jms.JMSException;
@@ -34,6 +32,9 @@
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.util.IdGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Keeps track of the STOMP subscription so that acking is correctly done.
@@ -42,6 +43,10 @@
*/
public class StompSubscription {
+ private static final Logger LOG = LoggerFactory.getLogger(StompSubscription.class);
+
+ private static final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
+
public static final String AUTO_ACK = Stomp.Headers.Subscribe.AckModeValues.AUTO;
public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
@@ -50,27 +55,37 @@
protected final String subscriptionId;
protected final ConsumerInfo consumerInfo;
- protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<>();
- protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<>();
+ protected final Map<MessageId, StompAckEntry> dispatchedMessage = new LinkedHashMap<>();
+ protected final Map<String, StompAckEntry> pendingAcks; // STOMP v1.2 requires ACK ID tracking
+ protected final LinkedList<StompAckEntry> transactedMessages = new LinkedList<>();
protected String ackMode = AUTO_ACK;
protected ActiveMQDestination destination;
protected String transformation;
- public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
+ public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation, Map<String, StompAckEntry> pendingAcks) {
this.protocolConverter = stompTransport;
this.subscriptionId = subscriptionId;
this.consumerInfo = consumerInfo;
this.transformation = transformation;
+ this.pendingAcks = pendingAcks;
}
- void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
+ void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
- if (ackMode.equals(CLIENT_ACK) || ackMode.equals(INDIVIDUAL_ACK)) {
+
+ String ackId = null;
+ if (isClientAck() || isIndividualAck()) {
+ ackId = ACK_ID_GENERATOR.generateId();
+ StompAckEntry pendingAck = new StompAckEntry(md, ackId, this);
+
synchronized (this) {
- dispatchedMessage.put(message.getMessageId(), md);
+ dispatchedMessage.put(message.getMessageId(), pendingAck);
}
- } else if (ackMode.equals(AUTO_ACK)) {
+ if (protocolConverter.isStomp12()) {
+ this.pendingAcks.put(ackId, pendingAck);
+ }
+ } else if (isAutoAck()) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
protocolConverter.getStompTransport().sendToActiveMQ(ack);
}
@@ -93,35 +108,48 @@
command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
}
- if (ackId != null) {
+ if (protocolConverter.isStomp12() && ackId != null) {
command.getHeaders().put(Stomp.Headers.Message.ACK_ID, ackId);
}
- protocolConverter.getStompTransport().sendToStomp(command);
+ try {
+ protocolConverter.getStompTransport().sendToStomp(command);
+ } catch (IOException ex) {
+ if (ackId != null) {
+ pendingAcks.remove(ackId);
+ }
+ throw ex;
+ }
}
synchronized void onStompAbort(TransactionId transactionId) {
- unconsumedMessage.clear();
+ // Restore the pending ACKs so that their ACK IDs are again valid for a client
+ // to operate on.
+ LOG.trace("Transaction Abort restoring {} pending ACKs to valid state.", transactedMessages.size());
+ for (StompAckEntry ackEntry : transactedMessages) {
+ if (protocolConverter.isStomp12()) {
+ pendingAcks.put(ackEntry.getAckId(), ackEntry);
+ }
+ }
+ transactedMessages.clear();
}
void onStompCommit(TransactionId transactionId) {
MessageAck ack = null;
synchronized (this) {
- for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
- @SuppressWarnings("rawtypes")
- Map.Entry entry = (Entry)iter.next();
- MessageDispatch msg = (MessageDispatch)entry.getValue();
- if (unconsumedMessage.contains(msg)) {
- iter.remove();
+ for (Iterator<StompAckEntry> iterator = dispatchedMessage.values().iterator(); iterator.hasNext();) {
+ StompAckEntry ackEntry = iterator.next();
+ if (transactedMessages.contains(ackEntry)) {
+ iterator.remove();
}
}
// For individual Ack we already sent an Ack that will be applied on commit
// we don't send a second standard Ack as that would produce an error.
- if (!unconsumedMessage.isEmpty() && ackMode == CLIENT_ACK) {
- ack = new MessageAck(unconsumedMessage.getLast(), MessageAck.STANDARD_ACK_TYPE, unconsumedMessage.size());
+ if (!transactedMessages.isEmpty() && isClientAck()) {
+ ack = new MessageAck(transactedMessages.getLast().getMessageDispatch(), MessageAck.STANDARD_ACK_TYPE, transactedMessages.size());
ack.setTransactionId(transactionId);
- unconsumedMessage.clear();
+ transactedMessages.clear();
}
}
// avoid contention with onMessageDispatch
@@ -131,10 +159,10 @@
}
synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
-
MessageId msgId = new MessageId(messageId);
- if (!dispatchedMessage.containsKey(msgId)) {
+ final StompAckEntry ackEntry = dispatchedMessage.get(msgId);
+ if (ackEntry == null) {
return null;
}
@@ -142,35 +170,33 @@
ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId());
- final ArrayList<String> acknowledgedMessages = new ArrayList<>();
-
- if (ackMode == CLIENT_ACK) {
+ if (isClientAck()) {
if (transactionId == null) {
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
} else {
ack.setAckType(MessageAck.DELIVERED_ACK_TYPE);
}
int count = 0;
- for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
+ for (Iterator<StompAckEntry> iterator = dispatchedMessage.values().iterator(); iterator.hasNext();) {
+ StompAckEntry entry = iterator.next();
+ MessageId current = entry.getMessageId();
- @SuppressWarnings("rawtypes")
- Map.Entry entry = (Entry)iter.next();
- MessageId id = (MessageId)entry.getKey();
- MessageDispatch msg = (MessageDispatch)entry.getValue();
+ if (entry.getAckId() != null) {
+ pendingAcks.remove(entry.getAckId());
+ }
if (transactionId != null) {
- if (!unconsumedMessage.contains(msg)) {
- unconsumedMessage.add(msg);
+ if (!transactedMessages.contains(entry)) {
+ transactedMessages.add(entry);
count++;
}
} else {
- acknowledgedMessages.add(id.toString());
- iter.remove();
+ iterator.remove();
count++;
}
- if (id.equals(msgId)) {
- ack.setLastMessageId(id);
+ if (current.equals(msgId)) {
+ ack.setLastMessageId(current);
break;
}
}
@@ -178,14 +204,15 @@
if (transactionId != null) {
ack.setTransactionId(transactionId);
}
-
- this.protocolConverter.afterClientAck(this, acknowledgedMessages);
- } else if (ackMode == INDIVIDUAL_ACK) {
+ } else if (isIndividualAck()) {
+ if (ackEntry.getAckId() != null) {
+ pendingAcks.remove(ackEntry.getAckId());
+ }
ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
ack.setMessageID(msgId);
ack.setMessageCount(1);
if (transactionId != null) {
- unconsumedMessage.add(dispatchedMessage.get(msgId));
+ transactedMessages.add(dispatchedMessage.get(msgId));
ack.setTransactionId(transactionId);
} else {
dispatchedMessage.remove(msgId);
@@ -196,23 +223,29 @@
}
public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
-
MessageId msgId = new MessageId(messageId);
if (!dispatchedMessage.containsKey(msgId)) {
return null;
}
+ final StompAckEntry ackEntry = dispatchedMessage.get(msgId);
+
+ if (ackEntry.getAckId() != null) {
+ pendingAcks.remove(ackEntry.getAckId());
+ }
+
MessageAck ack = new MessageAck();
ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId());
ack.setAckType(MessageAck.POSION_ACK_TYPE);
ack.setMessageID(msgId);
if (transactionId != null) {
- unconsumedMessage.add(dispatchedMessage.get(msgId));
+ transactedMessages.add(ackEntry);
ack.setTransactionId(transactionId);
+ } else {
+ dispatchedMessage.remove(msgId);
}
- dispatchedMessage.remove(msgId);
return ack;
}
@@ -225,6 +258,18 @@
this.ackMode = ackMode;
}
+ public boolean isAutoAck() {
+ return ackMode.equals(AUTO_ACK);
+ }
+
+ public boolean isClientAck() {
+ return ackMode.equals(CLIENT_ACK);
+ }
+
+ public boolean isIndividualAck() {
+ return ackMode.equals(INDIVIDUAL_ACK);
+ }
+
public String getSubscriptionId() {
return subscriptionId;
}
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
index b7560c7..3944c50 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
@@ -21,7 +21,9 @@
import java.io.IOException;
import java.net.Socket;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
@@ -40,7 +42,6 @@
@Override
public void setUp() throws Exception {
-
super.setUp();
stompConnect();
@@ -70,7 +71,6 @@
@Test(timeout = 60000)
public void testTelnetStyleSends() throws Exception {
-
stompConnection.setVersion(Stomp.V1_2);
String connect = "CONNECT\r\n" +
@@ -107,7 +107,6 @@
@Test(timeout = 60000)
public void testClientAckWithoutAckId() throws Exception {
-
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
@@ -150,18 +149,40 @@
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("1", received.getBody());
- String frame = "ACK\n" + "message-id:" +
- received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ String ackId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
+
+ // Put ACK ID in wrong header
+ String frame = "ACK\n" + "message-id:" + ackId + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
received = stompConnection.receive();
assertTrue(received.getAction().equals("ERROR"));
LOG.info("Broker sent: " + received);
+
+ // Now place it in the correct location and check it still works.
+ frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("2", receiptId);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ frame = "DISCONNECT\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
public void testClientAck() throws Exception {
-
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
@@ -255,11 +276,106 @@
frame = "ACK\n" + "id:" +
received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ frame = "DISCONNECT\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test(timeout = 60000)
+ public void testClientAckMultipleMessagesWithSingleAck() throws Exception {
+ final int MESSAGE_COUNT = 10;
+
+ stompConnection.setVersion(Stomp.V1_2);
+
+ String connect = "STOMP\r\n" +
+ "accept-version:1.2\r\n" +
+ "login:system\r\n" +
+ "passcode:manager\r\n" +
+ "\r\n" +
+ "\u0000\r\n";
+
+ stompConnection.sendFrame(connect);
+
+ String f = stompConnection.receiveFrame();
+ LOG.info("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.2") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ // Send some messages
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+ stompConnection.sendFrame(message);
+ }
+
+ String subscribe = "SUBSCRIBE\n" +
+ "id:1\n" +
+ "ack:client\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(subscribe);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ String receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+ assertEquals(MESSAGE_COUNT, getProxyToQueue(getQueueName()).getQueueSize());
+
+ String lastAckId = null;
+
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ StompFrame received = stompConnection.receive();
+ LOG.info("Broker sent: " + received);
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals(String.format("%d", n), received.getBody());
+
+ lastAckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
+ }
+
+ String frame = "ACK\n" + "id:" + lastAckId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("2", receiptId);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ frame = "DISCONNECT\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() == 1;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
}
@Test(timeout = 60000)
public void testClientIndividualAck() throws Exception {
-
stompConnection.setVersion(Stomp.V1_2);
String connect = "STOMP\r\n" +
@@ -345,24 +461,117 @@
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("1", received.getBody());
+ String message1AckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
- frame = "ACK\n" + "id:" +
- received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ frame = "ACK\n" + "id:" + message1AckId + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
received = stompConnection.receive();
assertTrue(received.getAction().equals("MESSAGE"));
assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
assertEquals("3", received.getBody());
+ String message3AckId = received.getHeaders().get(Stomp.Headers.Message.ACK_ID);
- frame = "ACK\n" + "id:" +
- received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ frame = "ACK\n" + "id:" + message3AckId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("2", receiptId);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ frame = "DISCONNECT\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
- public void testQueueBrowerSubscription() throws Exception {
+ public void testRepeatedClientIndividualAckInMultipleTransactions() throws Exception {
+ final int MESSAGE_COUNT = 50;
+ stompConnection.setVersion(Stomp.V1_2);
+
+ String connect = "STOMP\r\n" +
+ "accept-version:1.2\r\n" +
+ "login:system\r\n" +
+ "passcode:manager\r\n" +
+ "\r\n" +
+ "\u0000\r\n";
+
+ stompConnection.sendFrame(connect);
+
+ String f = stompConnection.receiveFrame();
+ LOG.info("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.2") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ // Send some messages
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+ stompConnection.sendFrame(message);
+ }
+
+ // Subscribe to the queue
+ String subscribe = "SUBSCRIBE\n" +
+ "id:1\n" +
+ "activemq.prefetchSize:1\n" +
+ "ack:client-individual\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(subscribe);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ String receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+
+ // Receive all messages, each in their own transaction
+ // Ensure we don't have any errors
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ StompFrame received = stompConnection.receive();
+ LOG.info("Broker sent: " + received);
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals(String.format("%d", n), received.getBody());
+
+ // Ack & Commit the first message
+ String begin = "BEGIN\n" + "transaction:tx" + String.format("%d", n) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(begin);
+
+ String frame = "ACK\n" + "transaction:tx" + String.format("%d", n) + "\n" + "id:" +
+ received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ String commit = "COMMIT\n" + "transaction:tx" + String.format("%d", n) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(commit);
+ }
+
+ String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
+ }
+
+ @Test(timeout = 60000)
+ public void testQueueBrowerSubscription() throws Exception {
final int MSG_COUNT = 10;
String connectFrame = "STOMP\n" +
@@ -523,7 +732,6 @@
@Test(timeout = 60000)
public void testSubscribeWithNoId() throws Exception {
-
String connectFrame = "STOMP\n" +
"login:system\n" +
"passcode:manager\n" +
@@ -571,7 +779,7 @@
long usageStart = brokerService.getSystemUsage().getMemoryUsage().getUsage();
- for(int i = 0; i < MSG_COUNT; ++i) {
+ for (int i = 0; i < MSG_COUNT; ++i) {
String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
"receipt:0\n" +
"myXkProp:" + bigProp + "\n"+
@@ -593,11 +801,372 @@
"id:12345\n" + "browser:true\n\n" + Stomp.NULL;
stompConnection.sendFrame(subscribe);
- for(int i = 0; i < MSG_COUNT; ++i) {
+ for (int i = 0; i < MSG_COUNT; ++i) {
StompFrame message = stompConnection.receive();
assertEquals(Stomp.Responses.MESSAGE, message.getAction());
assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
}
+ }
+ @Test(timeout = 60000)
+ public void testAckMessagesAfterTransactionAbortClientIndividualAckMode() throws Exception {
+ doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testNackMessagesAfterTransactionAbortClientIndividualAckMode() throws Exception {
+ doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(true);
+ }
+
+ private void doTestMessagesRetirementAfterTransactionAbortClientIndividualAckMode(boolean nack) throws Exception {
+ final int MESSAGE_COUNT = 10;
+
+ stompConnection.setVersion(Stomp.V1_2);
+
+ String connect = "STOMP\r\n" +
+ "accept-version:1.2\r\n" +
+ "login:system\r\n" +
+ "passcode:manager\r\n" +
+ "\r\n" +
+ "\u0000\r\n";
+
+ stompConnection.sendFrame(connect);
+
+ String f = stompConnection.receiveFrame();
+ LOG.info("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.2") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ // Send some messages
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+ stompConnection.sendFrame(message);
+ }
+
+ // Subscribe to the queue
+ String subscribe = "SUBSCRIBE\n" +
+ "id:1\n" +
+ "activemq.prefetchSize:1\n" +
+ "ack:client-individual\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(subscribe);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ String receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+
+ // Start a TX that will later be aborted.
+ String begin = "BEGIN\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(begin);
+
+ List<String> ackIds = new ArrayList<>(MESSAGE_COUNT);
+
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ StompFrame received = stompConnection.receive();
+ LOG.info("Broker sent: " + received);
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals(String.format("%d", n), received.getBody());
+
+ ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID));
+
+ String frame = "ACK\n" + "transaction:tx1" + "\n" + "id:" +
+ received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ String commit = "ABORT\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(commit);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ for (String ackId : ackIds) {
+ if (nack) {
+ String frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ } else {
+ String frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("2", receiptId);
+ }
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
+ }
+
+ @Test(timeout = 60000)
+ public void testAckMessagesAfterTransactionAbortClientAckMode() throws Exception {
+ doTestMessagesRetirementAfterTransactionAbortClientAckMode(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testNackMessagesAfterTransactionAbortClientAckMode() throws Exception {
+ doTestMessagesRetirementAfterTransactionAbortClientAckMode(true);
+ }
+
+ private void doTestMessagesRetirementAfterTransactionAbortClientAckMode(boolean nack) throws Exception {
+ final int MESSAGE_COUNT = 10;
+
+ stompConnection.setVersion(Stomp.V1_2);
+
+ String connect = "STOMP\r\n" +
+ "accept-version:1.2\r\n" +
+ "login:system\r\n" +
+ "passcode:manager\r\n" +
+ "\r\n" +
+ "\u0000\r\n";
+
+ stompConnection.sendFrame(connect);
+
+ String f = stompConnection.receiveFrame();
+ LOG.info("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.2") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ // Send some messages
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+ stompConnection.sendFrame(message);
+ }
+
+ // Subscribe to the queue
+ String subscribe = "SUBSCRIBE\n" +
+ "id:1\n" +
+ "activemq.prefetchSize:" + MESSAGE_COUNT + "\n" +
+ "ack:client\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(subscribe);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ String receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+
+ // Start a TX that will later be aborted.
+ String begin = "BEGIN\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(begin);
+
+ List<String> ackIds = new ArrayList<>(MESSAGE_COUNT);
+
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ StompFrame received = stompConnection.receive();
+ LOG.info("Broker sent: " + received);
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals(String.format("%d", n), received.getBody());
+
+ ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID));
+ }
+
+ // Client ACK that enlists all messages in the TX
+ String frame = "ACK\n" + "transaction:tx1" + "\n" + "id:" + ackIds.get(MESSAGE_COUNT - 1) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ String commit = "ABORT\n" + "transaction:tx1" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(commit);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ for (String ackId : ackIds) {
+ if (nack) {
+ frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ } else {
+ frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("2", receiptId);
+ }
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
+ }
+
+ @Test(timeout = 60000)
+ public void testMixedAckNackWithMessageAckIdsClientAck() throws Exception {
+ doTestMixedAckNackWithMessageAckIds(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testMixedAckNackWithMessageAckIdsClientIndividualAck() throws Exception {
+ doTestMixedAckNackWithMessageAckIds(true);
+ }
+
+ public void doTestMixedAckNackWithMessageAckIds(boolean individual) throws Exception {
+
+ final int MESSAGE_COUNT = 20;
+
+ stompConnection.setVersion(Stomp.V1_2);
+
+ String connect = "STOMP\r\n" +
+ "accept-version:1.2\r\n" +
+ "login:system\r\n" +
+ "passcode:manager\r\n" +
+ "\r\n" +
+ "\u0000\r\n";
+
+ stompConnection.sendFrame(connect);
+
+ String f = stompConnection.receiveFrame();
+ LOG.info("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.2") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ // Send some messages
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + String.format("%d", n) + Stomp.NULL;
+ stompConnection.sendFrame(message);
+ }
+
+ // Subscribe to the queue
+ String subscribe = "SUBSCRIBE\n" +
+ "id:1\n" +
+ "activemq.prefetchSize:" + MESSAGE_COUNT + "\n" +
+ "ack:" + (individual ? "client-individual" : "client") + "\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(subscribe);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ String receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+
+ List<String> ackIds = new ArrayList<>(MESSAGE_COUNT);
+
+ for (int n = 0; n < MESSAGE_COUNT; n++) {
+ StompFrame received = stompConnection.receive();
+ LOG.info("Broker sent: " + received);
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals(String.format("%d", n), received.getBody());
+
+ ackIds.add(received.getHeaders().get(Stomp.Headers.Message.ACK_ID));
+ }
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == MESSAGE_COUNT;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ boolean nack = false;
+
+ for (String ackId : ackIds) {
+ if (nack) {
+ String frame = "NACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ nack = !nack;
+ } else {
+ String frame = "ACK\n" + "id:" + ackId + "\n" + "receipt:2" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ nack = !nack;
+ }
+
+ receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("2", receiptId);
+ }
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToQueue(getQueueName()).getQueueSize() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
+
+ String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
}
}
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
index 7ded503..5e96385 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
@@ -901,6 +901,7 @@
try {
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("ERROR"));
+ assertFalse("no stack trace impl leak:" + f, f.contains("at "));
} catch (IOException socketMayBeClosedFirstByBroker) {}
}
@@ -913,6 +914,7 @@
try {
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("ERROR"));
+ assertFalse("no stack trace impl leak:" + f, f.contains("at "));
} catch (IOException socketMayBeClosedFirstByBroker) {}
}
@@ -930,6 +932,7 @@
stompConnection.sendFrame(frame);
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("ERROR"));
+ assertFalse("no stack trace impl leak:" + f, f.contains("at "));
}
@Test(timeout = 60000)
@@ -946,6 +949,7 @@
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
+ assertFalse("no stack trace impl leak:" + frame, frame.contains("at "));
}
@Test(timeout = 60000)
@@ -964,6 +968,7 @@
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
assertTrue("Error Frame did not contain receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
+ assertFalse("no stack trace impl leak:" + frame, frame.contains("at "));
}
@Test(timeout = 60000)
diff --git a/activemq-tooling/activemq-maven-plugin/pom.xml b/activemq-tooling/activemq-maven-plugin/pom.xml
index 5588f8a..7f5cfee 100644
--- a/activemq-tooling/activemq-maven-plugin/pom.xml
+++ b/activemq-tooling/activemq-maven-plugin/pom.xml
@@ -31,13 +31,8 @@
<dependencies>
<dependency>
<groupId>org.apache.maven</groupId>
- <artifactId>maven-plugin-api</artifactId>
- <version>2.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.maven</groupId>
- <artifactId>maven-project</artifactId>
- <version>2.0</version>
+ <artifactId>maven-core</artifactId>
+ <version>${maven-core-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
@@ -72,6 +67,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/activemq-tooling/activemq-memtest-maven-plugin/pom.xml b/activemq-tooling/activemq-memtest-maven-plugin/pom.xml
index e42b37b..346571b 100644
--- a/activemq-tooling/activemq-memtest-maven-plugin/pom.xml
+++ b/activemq-tooling/activemq-memtest-maven-plugin/pom.xml
@@ -33,7 +33,7 @@
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-plugin-api</artifactId>
- <version>2.0</version>
+ <version>${maven-core-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
diff --git a/activemq-tooling/activemq-perf-maven-plugin/pom.xml b/activemq-tooling/activemq-perf-maven-plugin/pom.xml
index fde54a7..9490bde 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/pom.xml
+++ b/activemq-tooling/activemq-perf-maven-plugin/pom.xml
@@ -32,7 +32,7 @@
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-plugin-api</artifactId>
- <version>2.0</version>
+ <version>${maven-core-version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/spi/ClassLoaderSPIConnectionFactory.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/spi/ClassLoaderSPIConnectionFactory.java
index 8b84798..17ce87f 100644
--- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/spi/ClassLoaderSPIConnectionFactory.java
+++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/spi/ClassLoaderSPIConnectionFactory.java
@@ -57,14 +57,14 @@
} else {
LOG.info("Adding extension dir: " + f.getAbsolutePath());
- urls.add(f.toURL());
+ urls.add(f.toURI().toURL());
File[] files = f.listFiles();
if (files != null) {
for (int j = 0; j < files.length; j++) {
if (files[j].getName().endsWith(".zip") || files[j].getName().endsWith(".jar")) {
LOG.info("Adding extension dir: " + files[j].getAbsolutePath());
- urls.add(files[j].toURL());
+ urls.add(files[j].toURI().toURL());
}
}
}
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index 6ef398d..d63fed8 100644
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -274,18 +274,6 @@
<artifactId>apacheds-core-integ</artifactId>
<version>${directory-version}</version>
<scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>bouncycastle</groupId>
- <artifactId>bcprov-jdk15</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.bouncycastle</groupId>
- <artifactId>bcprov-jdk15</artifactId>
- <version>1.46</version>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.directory.server</groupId>
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index 80d45eb..89745a9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -1085,15 +1085,14 @@
consumerInfos.add(consumerInfo);
}
- for (ConsumerInfo info : consumerInfos) {
- connection.send(info);
- }
-
Message message = null;
for (ConsumerInfo info : consumerInfos) {
+ // one by one registration to avoid ordering issue with concurrent dispatch from composite dests broker side
+ connection.request(info);
for (int i = 0; i < numMessages; i++) {
message = receiveMessage(connection);
assertNotNull(message);
+ LOG.info("ORIG " + message.getMessageId());
connection.send(createAck(info, message, 1, MessageAck.DELIVERED_ACK_TYPE));
}
MessageAck ack = createAck(info, message, numMessages, MessageAck.STANDARD_ACK_TYPE);
@@ -1574,6 +1573,7 @@
protected void configureBroker(BrokerService broker) throws Exception {
super.configureBroker(broker);
broker.setKeepDurableSubsActive(keepDurableSubsActive);
+ maxWait = 2000;
}
public static Test suite() {
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
index 0739070..6a061c7 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
@@ -35,8 +35,14 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.ProducerThread;
import org.apache.activemq.util.Wait;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -201,6 +207,69 @@
@Test
public void testScheduleRestart() throws Exception {
+ testScheduleRestart(RestartType.NORMAL);
+ }
+
+ @Test
+ public void testScheduleFullRecoveryRestart() throws Exception {
+ testScheduleRestart(RestartType.FULL_RECOVERY);
+ }
+
+ @Test
+ public void testUpdatesAppliedToIndexBeforeJournalShouldBeDiscarded() throws Exception {
+ final int NUMBER_OF_MESSAGES = 1000;
+ final AtomicInteger numberOfDiscardedJobs = new AtomicInteger();
+ final JobSchedulerStoreImpl jobSchedulerStore = (JobSchedulerStoreImpl) broker.getJobSchedulerStore();
+ Location middleLocation = null;
+
+ Appender appender = new DefaultTestAppender() {
+ @Override
+ public void doAppend(LoggingEvent event) {
+ if (event.getMessage().toString().contains("Removed Job past last appened in the journal")) {
+ numberOfDiscardedJobs.incrementAndGet();
+ }
+ }
+ };
+
+ registerLogAppender(appender);
+
+ // send a messages
+ Connection connection = createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+ MessageProducer producer = session.createProducer(destination);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ TextMessage message = session.createTextMessage("test msg");
+ long time = 5000;
+ message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+ producer.send(message);
+
+ if (NUMBER_OF_MESSAGES / 2 == i) {
+ middleLocation = jobSchedulerStore.getJournal().getLastAppendLocation();
+ }
+ }
+
+ producer.close();
+
+ broker.stop();
+ broker.waitUntilStopped();
+
+ // Simulating the case here updates got applied on the index before the journal updates
+ jobSchedulerStore.getJournal().setLastAppendLocation(middleLocation);
+ jobSchedulerStore.load();
+
+ assertEquals(numberOfDiscardedJobs.get(), NUMBER_OF_MESSAGES / 2);
+ }
+
+ private void registerLogAppender(final Appender appender) {
+ org.apache.log4j.Logger log4jLogger =
+ org.apache.log4j.Logger.getLogger(JobSchedulerStoreImpl.class);
+ log4jLogger.addAppender(appender);
+ log4jLogger.setLevel(Level.TRACE);
+ }
+
+ private void testScheduleRestart(final RestartType restartType) throws Exception {
// send a message
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -213,12 +282,7 @@
producer.close();
//restart broker
- broker.stop();
- broker.waitUntilStopped();
-
- broker = createBroker(false);
- broker.start();
- broker.waitUntilStarted();
+ restartBroker(restartType);
// consume the message
connection = createConnection();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
index 6f6dc76..b1f3b2e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerManagementTest.java
@@ -23,6 +23,19 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
+import org.apache.log4j.Level;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -44,13 +57,25 @@
private static final transient Logger LOG = LoggerFactory.getLogger(JobSchedulerManagementTest.class);
+ @Override
+ protected BrokerService createBroker() throws Exception {
+ BrokerService brokerService = createBroker(true);
+ if (isPersistent()) {
+ ((JobSchedulerStoreImpl) brokerService.getJobSchedulerStore()).setCleanupInterval(500);
+ ((JobSchedulerStoreImpl) brokerService.getJobSchedulerStore()).setJournalMaxFileLength(100 * 1024);
+ }
+ return brokerService;
+ }
+
@Test
public void testRemoveAllScheduled() throws Exception {
- final int COUNT = 5;
+ org.apache.log4j.Logger.getLogger(Transaction.class).setLevel(Level.DEBUG);
+ final int COUNT = 5000;
+ System.setProperty("maxKahaDBTxSize", "" + (500*1024));
Connection connection = createConnection();
// Setup the scheduled Message
- scheduleMessage(connection, TimeUnit.SECONDS.toMillis(6), COUNT);
+ scheduleMessage(connection, TimeUnit.SECONDS.toMillis(180), COUNT);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -80,6 +105,10 @@
latch.await(10, TimeUnit.SECONDS);
assertEquals(latch.getCount(), COUNT);
+ if (isPersistent()) {
+ assertEquals(1, getNumberOfJournalFiles());
+ }
+
connection.close();
}
@@ -423,4 +452,15 @@
producer.close();
}
+
+ private int getNumberOfJournalFiles() throws IOException, InterruptedException {
+ Collection<DataFile> files = ((JobSchedulerStoreImpl) broker.getJobSchedulerStore()).getJournal().getFileMap().values();
+ int reality = 0;
+ for (DataFile file : files) {
+ if (file != null) {
+ reality++;
+ }
+ }
+ return reality;
+ }
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
index 5bf8d8c..acfd1ba 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerTestSupport.java
@@ -41,6 +41,11 @@
@Rule public TestName name = new TestName();
+ enum RestartType {
+ NORMAL,
+ FULL_RECOVERY
+ }
+
protected String connectionUri;
protected BrokerService broker;
protected JobScheduler jobScheduler;
@@ -113,4 +118,22 @@
answer.setUseJmx(isUseJmx());
return answer;
}
+
+ protected void restartBroker(RestartType restartType) throws Exception {
+ tearDown();
+
+ if (restartType == RestartType.FULL_RECOVERY) {
+ File dir = broker.getSchedulerDirectoryFile();
+
+ if (dir != null) {
+ IOHelper.deleteFile(new File(dir, "scheduleDB.data"));
+ IOHelper.deleteFile(new File(dir, "scheduleDB.redo"));
+ }
+ }
+
+ broker = createBroker(false);
+
+ broker.start();
+ broker.waitUntilStarted();
+ }
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7270Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7270Test.java
new file mode 100644
index 0000000..c79a536
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7270Test.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class AMQ7270Test extends TestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(AMQ7270Test.class);
+ final int messageCount = 150;
+ final int messageSize = 1024*1024;
+ final int maxPageSize = 50;
+ final ActiveMQQueue activeMQQueue = new ActiveMQQueue("BIG");
+ BrokerService broker;
+ ActiveMQConnectionFactory factory;
+
+ protected void configureBroker() throws Exception {
+ broker.setPersistent(false);
+ broker.setAdvisorySupport(false);
+
+ PolicyMap pMap = new PolicyMap();
+ PolicyEntry policy = new PolicyEntry();
+ // disable expriy processing as this will call browse in parallel
+ policy.setExpireMessagesPeriod(0);
+ policy.setMaxPageSize(maxPageSize);
+ pMap.setDefaultEntry(policy);
+
+ broker.setDestinationPolicy(pMap);
+ }
+
+ public void testConcurrentCopyMatchingPageSizeOk() throws Exception {
+
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(activeMQQueue);
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ BytesMessage bytesMessage = session.createBytesMessage();
+
+ for (int i = 0; i < messageCount; i++) {
+ bytesMessage.setIntProperty("id", i);
+ producer.send(activeMQQueue, bytesMessage);
+ }
+
+ final QueueViewMBean queueViewMBean = (QueueViewMBean)
+ broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
+
+ LOG.info(queueViewMBean.getName() + " Size: " + queueViewMBean.getEnqueueCount());
+
+ connection.close();
+
+
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+ for (int i=0; i<20; i++) {
+
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+
+ try {
+ // only match the last to require pageIn
+ queueViewMBean.copyMatchingMessagesTo("id=" + (messageCount - 1), "To");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ executor.shutdown();
+ assertTrue("all work done", executor.awaitTermination(30, TimeUnit.SECONDS));
+
+ final Queue underTest = (Queue) ((RegionBroker)broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(activeMQQueue);
+ assertEquals("page Size as expected " + underTest, maxPageSize, underTest.getMaxPageSize());
+ }
+
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ broker = new BrokerService();
+ broker.setBrokerName("thisOne");
+ configureBroker();
+ broker.start();
+ factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true");
+ factory.setWatchTopicAdvisories(false);
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ if (broker != null) {
+ broker.stop();
+ broker = null;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
new file mode 100644
index 0000000..da96431
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/MKahaDBTxRecoveryTest.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TransactionIdTransformer;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.*;
+
+public class MKahaDBTxRecoveryTest {
+
+ static final Logger LOG = LoggerFactory.getLogger(MKahaDBTxRecoveryTest.class);
+ private final static int maxFileLength = 1024*1024*32;
+
+ private final static String PREFIX_DESTINATION_NAME = "queue";
+
+ private final static String DESTINATION_NAME = PREFIX_DESTINATION_NAME + ".test";
+ private final static String DESTINATION_NAME_2 = PREFIX_DESTINATION_NAME + "2.test";
+ private final static int CLEANUP_INTERVAL_MILLIS = 500;
+
+ BrokerService broker;
+ private List<KahaDBPersistenceAdapter> kahadbs = new LinkedList<KahaDBPersistenceAdapter>();
+
+
+ @After
+ public void tearDown() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+
+ protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setUseJmx(true);
+ broker.setBrokerName("localhost");
+ broker.setPersistenceAdapter(kaha);
+ return broker;
+ }
+
+ @Test
+ public void testCommitOutcomeDeliveryOnRecovery() throws Exception {
+
+ prepareBrokerWithMultiStore(true);
+ broker.start();
+ broker.waitUntilStarted();
+
+
+ // Ensure we have an Admin View.
+ assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return (broker.getAdminView()) != null;
+ }
+ }));
+
+
+ final AtomicBoolean injectFailure = new AtomicBoolean(true);
+
+ final AtomicInteger reps = new AtomicInteger();
+ final AtomicReference<TransactionIdTransformer> delegate = new AtomicReference<TransactionIdTransformer>();
+
+ TransactionIdTransformer faultInjector = new TransactionIdTransformer() {
+ @Override
+ public TransactionId transform(TransactionId txid) {
+ if (injectFailure.get() && reps.incrementAndGet() > 5) {
+ throw new RuntimeException("Bla");
+ }
+ return delegate.get().transform(txid);
+ }
+ };
+ // set up kahadb to fail after N ops
+ for (KahaDBPersistenceAdapter pa : kahadbs) {
+ if (delegate.get() == null) {
+ delegate.set(pa.getStore().getTransactionIdTransformer());
+ }
+ pa.setTransactionIdTransformer(faultInjector);
+ }
+
+ ActiveMQConnectionFactory f = new ActiveMQConnectionFactory("vm://localhost");
+ f.setAlwaysSyncSend(true);
+ Connection c = f.createConnection();
+ c.start();
+ Session s = c.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = s.createProducer(new ActiveMQQueue(DESTINATION_NAME + "," + DESTINATION_NAME_2));
+ producer.send(s.createTextMessage("HI"));
+ try {
+ s.commit();
+ } catch (Exception expected) {
+ expected.printStackTrace();
+ }
+
+ assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME)));
+ assertNotNull(broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2)));
+
+ final Destination destination1 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
+ final Destination destination2 = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
+
+ assertTrue("Partial commit - one dest has message", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return destination2.getMessageStore().getMessageCount() != destination1.getMessageStore().getMessageCount();
+ }
+ }));
+
+ // check completion on recovery
+ injectFailure.set(false);
+
+ // fire in many more local transactions to use N txStore journal files
+ for (int i=0; i<100; i++) {
+ producer.send(s.createTextMessage("HI"));
+ s.commit();
+ }
+
+ broker.stop();
+
+ // fail recovery processing on first attempt
+ prepareBrokerWithMultiStore(false);
+ broker.setPlugins(new BrokerPlugin[] {new BrokerPluginSupport() {
+
+ @Override
+ public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
+ // longer than CleanupInterval
+ TimeUnit.SECONDS.sleep( 2);
+ throw new RuntimeException("Sorry");
+ }
+ }});
+ broker.start();
+
+ // second recovery attempt should sort it
+ broker.stop();
+ prepareBrokerWithMultiStore(false);
+ broker.start();
+ broker.waitUntilStarted();
+
+ // verify commit completed
+ Destination destination = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME));
+ assertEquals(101, destination.getMessageStore().getMessageCount());
+
+ destination = broker.getDestination(new ActiveMQQueue(DESTINATION_NAME_2));
+ assertEquals(101, destination.getMessageStore().getMessageCount());
+ }
+
+
+ protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
+ KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
+ kaha.setJournalMaxFileLength(maxFileLength);
+ kaha.setCleanupInterval(CLEANUP_INTERVAL_MILLIS);
+ if (delete) {
+ kaha.deleteAllMessages();
+ }
+ kahadbs.add(kaha);
+ return kaha;
+ }
+
+ public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
+
+ MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
+ if (deleteAllMessages) {
+ multiKahaDBPersistenceAdapter.deleteAllMessages();
+ }
+ ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>();
+
+ adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME, deleteAllMessages));
+ adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME + "2", deleteAllMessages));
+
+ multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
+ multiKahaDBPersistenceAdapter.setJournalMaxFileLength(4*1024);
+ multiKahaDBPersistenceAdapter.setJournalCleanupInterval(10);
+
+ broker = createBroker(multiKahaDBPersistenceAdapter);
+ }
+
+ private FilteredKahaDBPersistenceAdapter createFilteredKahaDBByDestinationPrefix(String destinationPrefix, boolean deleteAllMessages)
+ throws IOException {
+ FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
+ template.setPersistenceAdapter(createStore(deleteAllMessages));
+ if (destinationPrefix != null) {
+ template.setQueue(destinationPrefix + ".>");
+ }
+ return template;
+ }
+}
\ No newline at end of file
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
index 4b8942b..aade6d3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -16,10 +16,12 @@
*/
package org.apache.activemq.network;
+import static junit.framework.TestCase.assertNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
@@ -30,6 +32,8 @@
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DestinationStatistics;
@@ -227,5 +231,25 @@
}, 10000, 500));
}
+ protected void assertSubscriptionMapCounts(NetworkBridge networkBridge, final int count) {
+ assertNotNull(networkBridge);
+ DemandForwardingBridgeSupport bridge = (DemandForwardingBridgeSupport) networkBridge;
+ assertEquals(count, bridge.subscriptionMapByLocalId.size());
+ assertEquals(count, bridge.subscriptionMapByRemoteId.size());
+ }
+
+ protected DemandForwardingBridge findDuplexBridge(final TransportConnector connector) throws Exception {
+ assertNotNull(connector);
+
+ for (TransportConnection tc : connector.getConnections()) {
+ if (tc.getConnectionId().startsWith("networkConnector_")) {
+ final Field bridgeField = TransportConnection.class.getDeclaredField("duplexBridge");
+ bridgeField.setAccessible(true);
+ return (DemandForwardingBridge) bridgeField.get(tc);
+ }
+ }
+
+ return null;
+ }
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java
index c5899a0..800c103 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.activemq.network;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import java.lang.reflect.Field;
import java.net.URI;
import javax.jms.JMSException;
@@ -28,15 +30,18 @@
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
+import scala.annotation.bridge;
/**
* This test is to show that if a durable subscription over a network bridge is deleted and
@@ -106,7 +111,7 @@
});
}
- public void testReceive(BrokerService receiveBroker, Session receiveSession,
+ protected void testReceive(BrokerService receiveBroker, Session receiveSession,
BrokerService publishBroker, Session publishSession, ConsumerCreator secondConsumerCreator) throws Exception {
final DestinationStatistics destinationStatistics =
@@ -118,6 +123,17 @@
waitForConsumerCount(destinationStatistics, 1);
+ final NetworkBridge bridge;
+ if (publishBroker.getNetworkConnectors().size() > 0) {
+ Wait.waitFor(() -> publishBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, 10000, 500);
+ bridge = publishBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+ } else {
+ bridge = findDuplexBridge(publishBroker.getTransportConnectorByScheme("tcp"));
+ }
+
+ //Should be 2 - one for the durable destination and one for the advisory destinations
+ assertSubscriptionMapCounts(bridge, 2);
+
//remove the durable
final ConnectionContext context = new ConnectionContext();
RemoveSubscriptionInfo info = getRemoveSubscriptionInfo(context, receiveBroker);
@@ -126,6 +142,9 @@
receiveBroker.getBroker().removeSubscription(context, info);
waitForConsumerCount(destinationStatistics, 0);
+ //Should be 1 - 0 for the durable destination and one for the advisory destinations
+ assertSubscriptionMapCounts(bridge, 1);
+
//re-create consumer
MessageConsumer bridgeConsumer2 = secondConsumerCreator.createConsumer();
waitForConsumerCount(destinationStatistics, 1);
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java
index 86165f5..fd4ffe7 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationMapLegacyTest.java
@@ -38,14 +38,14 @@
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.jaas.GroupPrincipal;
import org.apache.activemq.util.Wait;
+import org.apache.directory.api.ldap.model.name.Dn;
+import org.apache.directory.api.ldap.model.name.Rdn;
+import org.apache.directory.api.ldap.model.ldif.LdifEntry;
+import org.apache.directory.api.ldap.model.ldif.LdifReader;
+import org.apache.directory.api.ldap.model.message.ModifyRequest;
+import org.apache.directory.api.ldap.model.message.ModifyRequestImpl;
import org.apache.directory.ldap.client.api.LdapConnection;
import org.apache.directory.server.core.integ.AbstractLdapTestUnit;
-import org.apache.directory.shared.ldap.model.ldif.LdifEntry;
-import org.apache.directory.shared.ldap.model.ldif.LdifReader;
-import org.apache.directory.shared.ldap.model.message.ModifyRequest;
-import org.apache.directory.shared.ldap.model.message.ModifyRequestImpl;
-import org.apache.directory.shared.ldap.model.name.Dn;
-import org.apache.directory.shared.ldap.model.name.Rdn;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationModuleTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationModuleTest.java
index 3f52ed3..b285571 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationModuleTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/AbstractCachedLDAPAuthorizationModuleTest.java
@@ -18,7 +18,7 @@
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.jaas.UserPrincipal;
-import org.apache.directory.shared.ldap.model.message.ModifyRequest;
+import org.apache.directory.api.ldap.model.message.ModifyRequest;
import org.junit.Test;
import java.util.Set;
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyOpenLDAPTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyOpenLDAPTest.java
index 5c2764a..d6c366d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyOpenLDAPTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyOpenLDAPTest.java
@@ -19,10 +19,10 @@
import java.io.IOException;
import java.io.InputStream;
+import org.apache.directory.api.ldap.model.name.Dn;
+import org.apache.directory.api.ldap.model.exception.LdapException;
import org.apache.directory.ldap.client.api.LdapConnection;
import org.apache.directory.ldap.client.api.LdapNetworkConnection;
-import org.apache.directory.shared.ldap.model.exception.LdapException;
-import org.apache.directory.shared.ldap.model.name.Dn;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyTest.java
index f696cb3..aa5f232 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleLegacyTest.java
@@ -16,14 +16,14 @@
*/
package org.apache.activemq.security;
+import org.apache.directory.api.ldap.model.name.Dn;
+import org.apache.directory.api.ldap.model.exception.LdapException;
import org.apache.directory.ldap.client.api.LdapConnection;
import org.apache.directory.ldap.client.api.LdapNetworkConnection;
import org.apache.directory.server.annotations.CreateLdapServer;
import org.apache.directory.server.annotations.CreateTransport;
import org.apache.directory.server.core.annotations.ApplyLdifFiles;
import org.apache.directory.server.core.integ.FrameworkRunner;
-import org.apache.directory.shared.ldap.model.exception.LdapException;
-import org.apache.directory.shared.ldap.model.name.Dn;
import org.junit.runner.RunWith;
import java.io.IOException;
@@ -41,6 +41,7 @@
protected SimpleCachedLDAPAuthorizationMap createMap() {
SimpleCachedLDAPAuthorizationMap map = super.createMap();
map.setConnectionURL("ldap://localhost:" + getLdapServer().getPort());
+ map.setConnectionPassword("secret");
return map;
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleOpenLDAPTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleOpenLDAPTest.java
index 6d0ca93..e1a0db2 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleOpenLDAPTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleOpenLDAPTest.java
@@ -16,10 +16,10 @@
*/
package org.apache.activemq.security;
+import org.apache.directory.api.ldap.model.name.Dn;
+import org.apache.directory.api.ldap.model.exception.LdapException;
import org.apache.directory.ldap.client.api.LdapConnection;
import org.apache.directory.ldap.client.api.LdapNetworkConnection;
-import org.apache.directory.shared.ldap.model.exception.LdapException;
-import org.apache.directory.shared.ldap.model.name.Dn;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleTest.java
index 5d6f2e7..e2a280b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/CachedLDAPAuthorizationModuleTest.java
@@ -16,13 +16,13 @@
*/
package org.apache.activemq.security;
+import org.apache.directory.api.ldap.model.name.Dn;
import org.apache.directory.ldap.client.api.LdapConnection;
import org.apache.directory.ldap.client.api.LdapNetworkConnection;
import org.apache.directory.server.annotations.CreateLdapServer;
import org.apache.directory.server.annotations.CreateTransport;
import org.apache.directory.server.core.annotations.ApplyLdifFiles;
import org.apache.directory.server.core.integ.FrameworkRunner;
-import org.apache.directory.shared.ldap.model.name.Dn;
import org.junit.runner.RunWith;
import java.io.InputStream;
@@ -39,6 +39,7 @@
protected SimpleCachedLDAPAuthorizationMap createMap() {
SimpleCachedLDAPAuthorizationMap map = super.createMap();
map.setConnectionURL("ldap://localhost:" + getLdapServer().getPort());
+ map.setConnectionPassword("secret");
return map;
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/LDAPAuthorizationMapTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/LDAPAuthorizationMapTest.java
index 130a0da..f4fa851 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/LDAPAuthorizationMapTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/LDAPAuthorizationMapTest.java
@@ -65,6 +65,7 @@
authMap.setQueueSearchMatchingFormat(new MessageFormat("uid={0},ou=queues,ou=destinations,o=ActiveMQ,ou=system"));
authMap.setAdvisorySearchBase("uid=ActiveMQ.Advisory,ou=topics,ou=destinations,o=ActiveMQ,ou=system");
authMap.setTempSearchBase("uid=ActiveMQ.Temp,ou=topics,ou=destinations,o=ActiveMQ,ou=system");
+ authMap.setConnectionPassword("secret");
}
@Test
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/security/TextFileCertificateLoginModuleTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/security/TextFileCertificateLoginModuleTest.java
index 76681c6..9ca43c9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/security/TextFileCertificateLoginModuleTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/security/TextFileCertificateLoginModuleTest.java
@@ -38,6 +38,7 @@
private static final String CERT_USERS_FILE_SMALL = "cert-users-SMALL.properties";
private static final String CERT_USERS_FILE_LARGE = "cert-users-LARGE.properties";
+ private static final String CERT_USERS_FILE_REGEXP = "cert-users-REGEXP.properties";
private static final String CERT_GROUPS_FILE = "cert-groups.properties";
private static final Logger LOG = LoggerFactory.getLogger(TextFileCertificateLoginModuleTest.class);
@@ -76,6 +77,11 @@
loginTest(CERT_USERS_FILE_LARGE, CERT_GROUPS_FILE);
}
+ @Test
+ public void testLoginWithREGEXPUsersFile() throws Exception {
+ loginTest(CERT_USERS_FILE_REGEXP, CERT_GROUPS_FILE);
+ }
+
private void loginTest(String usersFiles, String groupsFile) throws LoginException {
HashMap options = new HashMap<String, String>();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/EmptyTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/EmptyTransactionTest.java
new file mode 100644
index 0000000..e4f6196
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/EmptyTransactionTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MessageDatabase;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.experimental.theories.Theories;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class EmptyTransactionTest extends TestCase {
+
+ private static final int CHECKPOINT_INTERVAL = 500;
+ private BrokerService broker;
+
+ public void testEmptyTransactionsCheckpoint() throws Exception {
+
+ AtomicBoolean hadRecovery = new AtomicBoolean(false);
+ DefaultTestAppender appender = new DefaultTestAppender() {
+ @Override
+ public void doAppend(LoggingEvent event) {
+ if (event.getMessage().toString().contains("Recovering from the journal @")) {
+ hadRecovery.set(true);
+ }
+ }
+ };
+
+ org.apache.log4j.Logger.getLogger(MessageDatabase.class).addAppender(appender);
+
+ start(true);
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(new ActiveMQQueue("QueueName"));
+ producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ sendMessage(session, producer);
+
+ // wait checkpoint
+ // When we create a new consumer a KahaProducerAuditCommand written to the journal files changing the lastUpdate pointer
+ Thread.sleep(CHECKPOINT_INTERVAL * 2);
+
+ for (int i = 0; i < 5; i++) {
+ sendMessage(session, producer);
+ }
+
+ restart();
+
+ assertFalse(hadRecovery.get());
+ }
+
+ private void sendMessage(final Session session, final MessageProducer producer) throws JMSException {
+ TextMessage m = session.createTextMessage("Hi");
+ producer.send(m);
+ session.commit();
+ }
+
+ private void restart() throws Exception {
+ broker.stop();
+ broker.waitUntilStopped();
+ start(false);
+ }
+
+ private void start(final boolean deleteMessages) throws Exception {
+ broker = new BrokerService();
+ KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter();
+ kahaDB.setCheckpointInterval(CHECKPOINT_INTERVAL);
+ broker.setPersistenceAdapter(kahaDB);
+ broker.setPersistent(true);
+ broker.setDeleteAllMessagesOnStartup(deleteMessages);
+ broker.start();
+ broker.waitUntilStarted();
+ }
+
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingDisableConcurrentStoreAndDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingDisableConcurrentStoreAndDispatchTest.java
new file mode 100644
index 0000000..5c7de8e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingDisableConcurrentStoreAndDispatchTest.java
@@ -0,0 +1,18 @@
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+import java.io.File;
+import java.io.IOException;
+
+public class QueueBrowsingDisableConcurrentStoreAndDispatchTest extends QueueBrowsingTest {
+ @Override
+ public BrokerService createBroker() throws IOException {
+ BrokerService broker = super.createBroker();
+ KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
+ kahadb.setConcurrentStoreAndDispatchQueues(false);
+ broker.setPersistenceAdapter(kahadb);
+ return broker;
+ }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
index 3e2d067..d2e2e67 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingTest.java
@@ -201,6 +201,10 @@
producer.send(session.createTextMessage(data));
}
+ //Consume one message to free memory and allow the cursor to pageIn messages
+ MessageConsumer consumer = session.createConsumer(queue);
+ consumer.receive(1000);
+
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> enumeration = browser.getEnumeration();
int received = 0;
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
index 76f54ff..9beb4d6 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicProducerFlowControlTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.usecases;
+import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -33,9 +34,11 @@
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.DefaultTestAppender;
import org.apache.activemq.util.Wait;
@@ -57,6 +60,8 @@
private BrokerService broker;
protected void setUp() throws Exception {
+ produced.set(0);
+ consumed.set(0);
// Setup and start the broker
broker = new BrokerService();
broker.setBrokerName(brokerName);
@@ -202,6 +207,119 @@
}
}
+
+ public void testTransactedProducerBlockedAndClosedWillRelease() throws Exception {
+ doTestTransactedProducerBlockedAndClosedWillRelease(false);
+ }
+
+ public void testTransactedSyncSendProducerBlockedAndClosedWillRelease() throws Exception {
+ doTestTransactedProducerBlockedAndClosedWillRelease(true);
+ }
+
+ public void doTestTransactedProducerBlockedAndClosedWillRelease(final boolean alwaysSyncSend) throws Exception {
+
+ // Create the connection factory
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
+ connectionFactory.setWatchTopicAdvisories(false);
+ connectionFactory.setAlwaysSyncSend(alwaysSyncSend);
+ Connection c = connectionFactory.createConnection();
+ c.start();
+
+
+ ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+ prefetchPolicy.setAll(5000);
+ connectionFactory.setPrefetchPolicy(prefetchPolicy);
+ // Start the test destination listener
+ Session listenerSession = c.createSession(false, 1);
+ Destination destination = createDestination(listenerSession);
+
+
+ final AtomicInteger warnings = new AtomicInteger();
+ Appender appender = new DefaultTestAppender() {
+ @Override
+ public void doAppend(LoggingEvent event) {
+ if (event.getLevel().equals(Level.WARN) && event.getMessage().toString().contains("Usage Manager memory limit reached")) {
+ LOG.info("received log message: " + event.getMessage());
+ warnings.incrementAndGet();
+ }
+ }
+ };
+ org.apache.log4j.Logger log4jLogger =
+ org.apache.log4j.Logger.getLogger(Topic.class);
+ log4jLogger.addAppender(appender);
+ try {
+
+ // Start producing the test messages
+ final Session session = connectionFactory.createConnection().createSession(true, Session.SESSION_TRANSACTED);
+ final MessageProducer producer = session.createProducer(destination);
+
+ Thread producingThread = new Thread("Producing Thread") {
+ public void run() {
+ try {
+ for (long i = 0; i < numMessagesToSend; i++) {
+ producer.send(session.createTextMessage("test"));
+
+ long count = produced.incrementAndGet();
+ if (count % 10000 == 0) {
+ LOG.info("Produced " + count + " messages");
+ }
+ }
+ } catch (Throwable ex) {
+ ex.printStackTrace();
+ } finally {
+ try {
+ producer.close();
+ session.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+ };
+
+ producingThread.start();
+
+
+ assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return warnings.get() > 0;
+ }
+ }, 5 * 1000));
+
+
+ LOG.info("Produced: " + produced.get() + ", Warnings:" + warnings.get());
+
+ assertTrue("Producer got blocked", Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return warnings.get() > 0;
+ }
+ }, 5 * 1000));
+
+
+ final long enqueueCountWhenBlocked = broker.getDestination(ActiveMQDestination.transform(destination)).getDestinationStatistics().getEnqueues().getCount();
+
+ // now whack the hung connection broker side (mimic jmx), and verify usage gone b/c of rollback
+ for (TransportConnection transportConnection : broker.getTransportConnectors().get(0).getConnections()) {
+ transportConnection.serviceException(new IOException("forcing close for hung connection"));
+ }
+
+ assertTrue("Usage gets released on close", Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ LOG.info("Usage: " + broker.getSystemUsage().getMemoryUsage().getUsage());
+
+ return broker.getSystemUsage().getMemoryUsage().getUsage() == 0;
+ }
+ }, 5 * 1000));
+
+ c.close();
+
+ // verify no pending sends completed in rolledback tx
+ assertEquals("nothing sent during close", enqueueCountWhenBlocked, broker.getDestination(ActiveMQDestination.transform(destination)).getDestinationStatistics().getEnqueues().getCount());
+
+ } finally {
+ log4jLogger.removeAppender(appender);
+ }
+ }
+
protected Destination createDestination(Session listenerSession) throws Exception {
return new ActiveMQTopic("test");
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchConcurrentStoreAndDispatchFalseTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchConcurrentStoreAndDispatchFalseTest.java
new file mode 100644
index 0000000..48ff54d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchConcurrentStoreAndDispatchFalseTest.java
@@ -0,0 +1,17 @@
+package org.apache.activemq.usecases;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+import java.io.IOException;
+
+public class UsageBlockedDispatchConcurrentStoreAndDispatchFalseTest extends UsageBlockedDispatchTest {
+ @Override
+ protected BrokerService createBroker() throws IOException {
+ BrokerService broker = new BrokerService();
+ KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
+ kahadb.setConcurrentStoreAndDispatchQueues(false);
+ broker.setPersistenceAdapter(kahadb);
+ return broker;
+ }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java
index 7767672..dba73c1 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/UsageBlockedDispatchTest.java
@@ -32,6 +32,7 @@
import javax.jms.*;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -48,14 +49,13 @@
@Override
public void setUp() throws Exception {
- broker = new BrokerService();
+ broker = createBroker();
broker.setDataDirectory("target" + File.separator + "activemq-data");
broker.setPersistent(true);
broker.setUseJmx(true);
broker.setAdvisorySupport(false);
broker.setDeleteAllMessagesOnStartup(true);
- setDefaultPersistenceAdapter(broker);
SystemUsage sysUsage = broker.getSystemUsage();
sysUsage.getMemoryUsage().setLimit(100*1024);
@@ -75,6 +75,12 @@
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
}
+ protected BrokerService createBroker() throws IOException {
+ BrokerService broker = new BrokerService();
+ setDefaultPersistenceAdapter(broker);
+ return broker;
+ }
+
@Override
public void tearDown() throws Exception {
if (broker != null) {
@@ -125,8 +131,7 @@
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(willGetAPage);
- Message m = consumer.receive(messageReceiveTimeout);
- assertNotNull("got a message", m);
+ consumer.receive(messageReceiveTimeout);
final AtomicBoolean gotExpectedLogEvent = new AtomicBoolean(false);
Appender appender = new DefaultTestAppender() {
@@ -144,7 +149,7 @@
MessageConsumer noDispatchConsumer = consumerSession.createConsumer(shouldBeStuckForDispatch);
- m = noDispatchConsumer.receive(messageReceiveTimeout);
+ Message m = noDispatchConsumer.receive(messageReceiveTimeout);
assertNull("did not get a message", m);
assertTrue("Got the new warning about the blocked cursor", gotExpectedLogEvent.get());
diff --git a/activemq-unit-tests/src/test/resources/cert-users-REGEXP.properties b/activemq-unit-tests/src/test/resources/cert-users-REGEXP.properties
new file mode 100644
index 0000000..c422738
--- /dev/null
+++ b/activemq-unit-tests/src/test/resources/cert-users-REGEXP.properties
@@ -0,0 +1,19 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+CNODD=/DN=TEST_USER_\\d*[13579]/
+CNEVEN=/DN=TEST_USER_\\d*[02468]/
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds-legacy.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds-legacy.xml
index 911acba..a2e1ea3 100644
--- a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds-legacy.xml
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds-legacy.xml
@@ -39,7 +39,7 @@
<authorizationPlugin>
<map>
- <cachedLDAPAuthorizationMap connectionURL="ldap://localhost:${ldapPort}"/>
+ <cachedLDAPAuthorizationMap connectionURL="ldap://localhost:${ldapPort}" connectionPassword="secret" />
</map>
</authorizationPlugin>
</plugins>
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml
index 67768c1..f684ee1 100644
--- a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml
@@ -39,7 +39,7 @@
<authorizationPlugin>
<map>
- <cachedLDAPAuthorizationMap legacyGroupMapping="false" connectionURL="ldap://localhost:${ldapPort}" groupClass="org.apache.activemq.jaas.GroupPrincipal"/>
+ <cachedLDAPAuthorizationMap legacyGroupMapping="false" connectionURL="ldap://localhost:${ldapPort}" groupClass="org.apache.activemq.jaas.GroupPrincipal" connectionPassword="secret" />
</map>
</authorizationPlugin>
</plugins>
diff --git a/activemq-web-console/src/main/java/org/apache/activemq/web/config/OsgiConfiguration.java b/activemq-web-console/src/main/java/org/apache/activemq/web/config/OsgiConfiguration.java
index 276f3be..9b67be7 100644
--- a/activemq-web-console/src/main/java/org/apache/activemq/web/config/OsgiConfiguration.java
+++ b/activemq-web-console/src/main/java/org/apache/activemq/web/config/OsgiConfiguration.java
@@ -34,12 +34,12 @@
private ServiceRegistration service;
private String jmxUrl = "service:jmx:rmi:///jndi/rmi://localhost:1099/karaf-root";
- private String jmxUser = "karaf";
- private String jmxPassword = "karaf";
+ private String jmxUser;
+ private String jmxPassword;
private String jmsUrl = "tcp://localhost:61616";
- private String jmsUser = "karaf";
- private String jmsPassword = "karaf";
+ private String jmsUser;
+ private String jmsPassword;
public OsgiConfiguration() {
diff --git a/activemq-web-console/src/main/webapp/login.html b/activemq-web-console/src/main/webapp/login.html
index 393bcb8..2801e4a 100644
--- a/activemq-web-console/src/main/webapp/login.html
+++ b/activemq-web-console/src/main/webapp/login.html
@@ -105,7 +105,7 @@
<div class="footer_l">
<div class="footer_r">
<div>
- Copyright 2005-2013 The Apache Software Foundation.
+ Copyright 2005-2019 The Apache Software Foundation.
(<a href="?printable=true">printable version</a>)
</div>
diff --git a/activemq-web/pom.xml b/activemq-web/pom.xml
index 30ea042..5380de8 100644
--- a/activemq-web/pom.xml
+++ b/activemq-web/pom.xml
@@ -90,16 +90,12 @@
<!-- Rome RSS Reader -->
<dependency>
- <groupId>rome</groupId>
+ <groupId>com.rometools</groupId>
<artifactId>rome</artifactId>
</dependency>
<!-- XML -->
<dependency>
- <groupId>jdom</groupId>
- <artifactId>jdom</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
</dependency>
@@ -182,4 +178,4 @@
</build>
</profile>
</profiles>
-</project>
\ No newline at end of file
+</project>
diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
index 1b8897c..38546d7 100644
--- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
+++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
@@ -66,7 +66,7 @@
private long defaultReadTimeout = -1;
private long maximumReadTimeout = 20000;
private long requestTimeout = 1000;
- private String defaultContentType = "application/xml";
+ private String defaultContentType;
private final HashMap<String, WebClient> clients = new HashMap<String, WebClient>();
private final HashSet<MessageAvailableConsumer> activeConsumers = new HashSet<MessageAvailableConsumer>();
@@ -285,15 +285,16 @@
response.setHeader("Pragma", "no-cache"); // HTTP 1.0
response.setDateHeader("Expires", 0);
-
// Set content type as in request. This should be done before calling getWriter by specification
- String type = request.getContentType();
+ String type = getContentType(request);
if (type != null) {
response.setContentType(type);
} else {
- if (isXmlContent(message)) {
+ if (defaultContentType != null) {
response.setContentType(defaultContentType);
+ } else if (isXmlContent(message)) {
+ response.setContentType("application/xml");
} else {
response.setContentType("text/plain");
}
diff --git a/activemq-web/src/main/java/org/apache/activemq/web/view/RssMessageRenderer.java b/activemq-web/src/main/java/org/apache/activemq/web/view/RssMessageRenderer.java
index d677e93..21b7a2e 100644
--- a/activemq-web/src/main/java/org/apache/activemq/web/view/RssMessageRenderer.java
+++ b/activemq-web/src/main/java/org/apache/activemq/web/view/RssMessageRenderer.java
@@ -29,14 +29,14 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import com.sun.syndication.feed.synd.SyndContent;
-import com.sun.syndication.feed.synd.SyndContentImpl;
-import com.sun.syndication.feed.synd.SyndEntry;
-import com.sun.syndication.feed.synd.SyndEntryImpl;
-import com.sun.syndication.feed.synd.SyndFeed;
-import com.sun.syndication.feed.synd.SyndFeedImpl;
-import com.sun.syndication.io.FeedException;
-import com.sun.syndication.io.SyndFeedOutput;
+import com.rometools.rome.feed.synd.SyndContent;
+import com.rometools.rome.feed.synd.SyndContentImpl;
+import com.rometools.rome.feed.synd.SyndEntry;
+import com.rometools.rome.feed.synd.SyndEntryImpl;
+import com.rometools.rome.feed.synd.SyndFeed;
+import com.rometools.rome.feed.synd.SyndFeedImpl;
+import com.rometools.rome.io.FeedException;
+import com.rometools.rome.io.SyndFeedOutput;
/**
* This renderer uses XStream to render messages on a queue as full XML elements
@@ -53,7 +53,6 @@
public void renderMessage(PrintWriter writer, HttpServletRequest request, HttpServletResponse response, QueueBrowser browser, Message message) throws JMSException {
SyndFeed feed = getFeed(browser, request);
-
List<SyndEntry> entries = feed.getEntries();
SyndEntry entry = createEntry(browser, message, request);
SyndContent description = createEntryContent(browser, message, request);
diff --git a/assembly/src/main/descriptors/common-bin.xml b/assembly/src/main/descriptors/common-bin.xml
index 8c707c8..d9bf1de 100644
--- a/assembly/src/main/descriptors/common-bin.xml
+++ b/assembly/src/main/descriptors/common-bin.xml
@@ -283,8 +283,9 @@
<include>org.eclipse.jetty.orbit:org.eclipse.jdt.core</include>
<!-- Atom/RSS support -->
- <include>rome:rome</include>
- <include>jdom:jdom</include>
+ <include>com.rometools:rome</include>
+ <include>com.rometools:rome-utils</include>
+ <include>org.jdom:jdom2</include>
<!-- REST API -->
<include>org.jolokia:jolokia-core</include>
diff --git a/assembly/src/release/conf/jetty.xml b/assembly/src/release/conf/jetty.xml
index 95f83d1..0e44442 100644
--- a/assembly/src/release/conf/jetty.xml
+++ b/assembly/src/release/conf/jetty.xml
@@ -54,6 +54,16 @@
<property name="name" value="X-FRAME-OPTIONS"/>
<property name="value" value="SAMEORIGIN"/>
</bean>
+ <bean id="header" class="org.eclipse.jetty.rewrite.handler.HeaderPatternRule">
+ <property name="pattern" value="*"/>
+ <property name="name" value="X-XSS-Protection"/>
+ <property name="value" value="1; mode=block"/>
+ </bean>
+ <bean id="header" class="org.eclipse.jetty.rewrite.handler.HeaderPatternRule">
+ <property name="pattern" value="*"/>
+ <property name="name" value="X-Content-Type-Options"/>
+ <property name="value" value="nosniff"/>
+ </bean>
</list>
</property>
</bean>
@@ -172,4 +182,4 @@
</bean>
-</beans>
\ No newline at end of file
+</beans>
diff --git a/pom.xml b/pom.xml
index 4bdff14..7a3e65b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,40 +47,40 @@
<aries-version>1.1.0</aries-version>
<aries-transaction-version>1.1.1</aries-transaction-version>
<axion-version>1.0-M3-dev</axion-version>
- <camel-version>2.22.0</camel-version>
+ <camel-version>2.24.1</camel-version>
<camel-version-range>[2.20,3)</camel-version-range>
<cglib-version>2.2</cglib-version>
- <commons-beanutils-version>1.9.3</commons-beanutils-version>
+ <commons-beanutils-version>1.9.4</commons-beanutils-version>
<commons-collections-version>3.2.2</commons-collections-version>
<commons-daemon-version>1.0.15</commons-daemon-version>
- <commons-dbcp2-version>2.1.1</commons-dbcp2-version>
+ <commons-dbcp2-version>2.6.0</commons-dbcp2-version>
<commons-io-version>2.6</commons-io-version>
<commons-lang-version>2.6</commons-lang-version>
<commons-logging-version>1.2</commons-logging-version>
- <commons-pool2-version>2.4.2</commons-pool2-version>
+ <commons-pool2-version>2.6.2</commons-pool2-version>
<commons-primitives-version>1.0</commons-primitives-version>
<commons-net-version>3.6</commons-net-version>
- <directory-version>2.0.0-M6</directory-version>
- <ftpserver-version>1.0.6</ftpserver-version>
+ <directory-version>2.0.0.AM25</directory-version>
+ <ftpserver-version>1.1.1</ftpserver-version>
<geronimo-version>1.0</geronimo-version>
- <guava-version>18.0</guava-version>
+ <guava-version>28.0-jre</guava-version>
<hadoop-version>1.0.4</hadoop-version>
<hawtbuf-version>1.11</hawtbuf-version>
<hawtdispatch-version>1.22</hawtdispatch-version>
<howl-version>0.1.8</howl-version>
<hsqldb-version>1.8.0.12</hsqldb-version>
- <httpclient-version>4.5.6</httpclient-version>
- <httpcore-version>4.4.10</httpcore-version>
+ <httpclient-version>4.5.9</httpclient-version>
+ <httpcore-version>4.4.11</httpcore-version>
<insight-version>1.2.0.Beta4</insight-version>
- <jackson-version>2.9.8</jackson-version>
+ <jackson-version>2.9.9</jackson-version>
+ <jackson-databind-version>2.9.9.3</jackson-databind-version>
<jasypt-version>1.9.2</jasypt-version>
<jaxb-bundle-version>2.2.11_1</jaxb-bundle-version>
- <jdom-version>1.0</jdom-version>
<jetty9-version>9.2.26.v20180806</jetty9-version>
<jetty-version>${jetty9-version}</jetty-version>
<jmdns-version>3.4.1</jmdns-version>
<tomcat-api-version>8.0.53</tomcat-api-version>
- <jettison-version>1.3.8</jettison-version>
+ <jettison-version>1.4.0</jettison-version>
<jmock-version>2.5.1</jmock-version>
<jolokia-version>1.6.0</jolokia-version>
<josql-version>1.5_5</josql-version>
@@ -89,13 +89,13 @@
<junit-version>4.12</junit-version>
<hamcrest-version>1.3</hamcrest-version>
<jxta-version>2.0</jxta-version>
- <karaf-version>4.2.3</karaf-version>
+ <karaf-version>4.2.6</karaf-version>
<leveldb-api-version>0.9</leveldb-api-version>
<leveldb-version>0.9</leveldb-version>
<leveldbjni-version>1.8</leveldbjni-version>
<log4j-version>1.2.17</log4j-version>
<mockito-version>1.10.19</mockito-version>
- <owasp-dependency-check-version>3.3.0</owasp-dependency-check-version>
+ <owasp-dependency-check-version>4.0.2</owasp-dependency-check-version>
<powermock-version>1.6.5</powermock-version>
<mqtt-client-version>1.15</mqtt-client-version>
<openjpa-version>1.2.0</openjpa-version>
@@ -103,23 +103,23 @@
<org.osgi.core-version>4.3.1</org.osgi.core-version>
<p2psockets-version>1.1.2</p2psockets-version>
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
- <zookeeper-version>3.4.6</zookeeper-version>
- <qpid-proton-version>0.32.0</qpid-proton-version>
- <qpid-jms-version>0.41.0</qpid-jms-version>
- <qpid-jms-netty-version>4.1.34.Final</qpid-jms-netty-version>
- <qpid-jms-proton-version>0.32.0</qpid-jms-proton-version>
- <netty-all-version>4.1.34.Final</netty-all-version>
+ <zookeeper-version>3.4.14</zookeeper-version>
+ <qpid-proton-version>0.33.2</qpid-proton-version>
+ <qpid-jms-version>0.44.0</qpid-jms-version>
+ <qpid-jms-netty-version>4.1.37.Final</qpid-jms-netty-version>
+ <qpid-jms-proton-version>0.33.2</qpid-jms-proton-version>
+ <netty-all-version>4.1.37.Final</netty-all-version>
<regexp-version>1.3</regexp-version>
- <rome-version>1.0</rome-version>
+ <rome-version>1.12.1</rome-version>
<saxon-version>9.5.1-5</saxon-version>
<saxon-bundle-version>9.5.1-5_1</saxon-bundle-version>
<scala-plugin-version>3.1.0</scala-plugin-version>
<scala-version>2.11.0</scala-version>
- <shiro-version>1.2.6</shiro-version>
+ <shiro-version>1.4.1</shiro-version>
<scalatest-version>2.1.5</scalatest-version>
<slf4j-version>1.7.25</slf4j-version>
<snappy-version>1.1.2</snappy-version>
- <spring-version>4.3.18.RELEASE</spring-version>
+ <spring-version>4.3.24.RELEASE</spring-version>
<stax2-api-version>3.0.2</stax2-api-version>
<taglibs-version>1.2.5</taglibs-version>
<velocity-version>1.7</velocity-version>
@@ -129,8 +129,8 @@
<xmlbeans-bundle-version>2.6.0_2</xmlbeans-bundle-version>
<xmlresolver-bundle-version>1.2_5</xmlresolver-bundle-version>
<xpp3-version>1.1.4c</xpp3-version>
- <xstream-version>1.4.10</xstream-version>
- <xbean-version>4.2</xbean-version>
+ <xstream-version>1.4.11.1</xstream-version>
+ <xbean-version>4.14</xbean-version>
<xerces-version>2.12.0</xerces-version>
<jaxb-basics-version>0.6.4</jaxb-basics-version>
<stompjms-version>1.19</stompjms-version>
@@ -138,7 +138,7 @@
<pax-exam-version>4.9.1</pax-exam-version>
<paxexam-karaf-container-version>1.0.0</paxexam-karaf-container-version>
<pax-runner-version>1.8.6</pax-runner-version>
- <pax-url-version>2.4.3</pax-url-version>
+ <pax-url-version>2.6.1</pax-url-version>
<felix-configadmin-version>1.8.0</felix-configadmin-version>
<felix-framework-version>5.0.1</felix-framework-version>
@@ -151,7 +151,7 @@
<!-- Maven Plugin Version for this Project -->
<maven-bundle-plugin-version>2.3.7</maven-bundle-plugin-version>
- <maven-surefire-plugin-version>2.16</maven-surefire-plugin-version>
+ <maven-surefire-plugin-version>2.22.1</maven-surefire-plugin-version>
<maven-antrun-plugin-version>1.3</maven-antrun-plugin-version>
<maven-assembly-plugin-version>2.4</maven-assembly-plugin-version>
<maven-release-plugin-version>2.4.1</maven-release-plugin-version>
@@ -175,6 +175,8 @@
<maven-dependency-plugin-version>2.8</maven-dependency-plugin-version>
<maven-project-info-reports-plugin-version>2.7</maven-project-info-reports-plugin-version>
<maven-graph-plugin-version>1.30</maven-graph-plugin-version>
+ <maven-plugin-plugin-version>3.6.0</maven-plugin-plugin-version>
+ <maven-core-version>3.6.1</maven-core-version>
<!-- OSGi bundles properties -->
<activemq.osgi.import.pkg>*</activemq.osgi.import.pkg>
<activemq.osgi.export.pkg>org.apache.activemq*</activemq.osgi.export.pkg>
@@ -672,7 +674,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
- <version>${jackson-version}</version>
+ <version>${jackson-databind-version}</version>
</dependency>
<!-- Used to configure the activemq logs -->
@@ -1082,16 +1084,11 @@
<!-- ACTIVEMQ-WEB Specific Dependencies -->
<dependency>
- <groupId>rome</groupId>
+ <groupId>com.rometools</groupId>
<artifactId>rome</artifactId>
<version>${rome-version}</version>
</dependency>
<dependency>
- <groupId>jdom</groupId>
- <artifactId>jdom</artifactId>
- <version>${jdom-version}</version>
- </dependency>
- <dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>${mqtt-client-version}</version>
@@ -1287,6 +1284,11 @@
<artifactId>maven-archiver</artifactId>
<version>${maven-archiver-version}</version>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-plugin-plugin</artifactId>
+ <version>${maven-plugin-plugin-version}</version>
+ </plugin>
<!--This plugin's configuration is used to store Eclipse m2e settings only.
It has no influence on the Maven build itself.-->
<plugin>