Merge pull request #611 from jbonofre/AMQ-7444
[AMQ-7444] Fallback to remote location for remote address when socket has not been created yet
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
index def4218..92dfe92 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
@@ -24,10 +24,14 @@
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
+import org.apache.activemq.Message;
+import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.scheduler.Job;
import org.apache.activemq.broker.scheduler.JobScheduler;
import org.apache.activemq.broker.scheduler.JobSupport;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.ByteSequence;
/**
* MBean object that can be used to manage a single instance of a JobScheduler. The object
@@ -77,6 +81,24 @@
}
@Override
+ public int getDelayedMessageCount() throws Exception {
+ int counter = 0;
+ OpenWireFormat wireFormat = new OpenWireFormat();
+ for (Job job : jobScheduler.getAllJobs()) {
+ Message msg = (Message) wireFormat.unmarshal(new ByteSequence(job.getPayload()));
+ if (msg.getLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY) > 0) {
+ counter++;
+ }
+ }
+ return counter;
+ }
+
+ @Override
+ public int getScheduledMessageCount() throws Exception {
+ return this.jobScheduler.getAllJobs().size();
+ }
+
+ @Override
public TabularData getNextScheduleJobs() throws Exception {
OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
CompositeType ct = factory.getCompositeType();
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
index 82a48ae..9aedbef 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
@@ -122,4 +122,24 @@
@MBeanInfo("get the scheduled Jobs in the Store within the time range. Not HTML friendly ")
public abstract TabularData getAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish)throws Exception;
+ /**
+ * Get the number of messages in the scheduler.
+ *
+ * @return the number of messages in the scheduler.
+ *
+ * @throws Exception if an error occurs while querying the scheduler store.
+ */
+ @MBeanInfo("get the number of scheduled message (basically message in the scheduler")
+ public abstract int getScheduledMessageCount() throws Exception;
+
+ /**
+ * Get the number of delayed messages.
+ *
+ * @return the number of delayed messages.
+ *
+ * @throws Exception if an error occurs while querying the scheduler store.
+ */
+ @MBeanInfo("get the number of delayed message")
+ public abstract int getDelayedMessageCount() throws Exception;
+
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 6f654a4..41c6a48 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -116,7 +116,11 @@
if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
// Slow consumers should log and set their state as such.
if (!isSlowConsumer()) {
- LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString());
+ String remoteAddr = null;
+ if (context != null && context.getConnection() != null) {
+ remoteAddr = context.getConnection().getRemoteAddress();
+ }
+ LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow{}", toString(), (remoteAddr != null) ? ": " + remoteAddr : "");
setSlowConsumer(true);
for (Destination dest: destinations) {
dest.slowConsumer(getContext(), this);
@@ -133,7 +137,7 @@
return;
}
if (!warnedAboutWait) {
- LOG.info("{}: Pending message cursor [{}] is full, temp usag ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.",
+ LOG.info("{}: Pending message cursor [{}] is full, temp usage ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.",
new Object[]{
toString(),
matched,
diff --git a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
index 7a0e58c..47d4754 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
@@ -369,7 +369,10 @@
@Override
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
- if (!(desc.getName().equals("java.lang.String") || desc.getName().startsWith("java.util."))) {
+ if (!(desc.getName().startsWith("java.lang.")
+ || desc.getName().startsWith("com.thoughtworks.xstream")
+ || desc.getName().startsWith("java.util.")
+ || desc.getName().startsWith("org.apache.activemq."))) {
throw new InvalidClassException("Unauthorized deserialization attempt", desc.getName());
}
return super.resolveClass(desc);
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java b/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
index 0a717f4..396b650 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
@@ -40,7 +40,7 @@
private final ClassLoader inLoader;
static {
- serializablePackages = System.getProperty("org.apache.activemq.SERIALIZABLE_PACKAGES","org.apache.activemq,org.fusesource.hawtbuf,com.thoughtworks.xstream.mapper").split(",");
+ serializablePackages = System.getProperty("org.apache.activemq.SERIALIZABLE_PACKAGES","java.lang,org.apache.activemq,org.fusesource.hawtbuf,com.thoughtworks.xstream.mapper").split(",");
}
public ClassLoadingAwareObjectInputStream(InputStream in) throws IOException {
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index e30f3bf..6e0688b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -4254,8 +4254,10 @@
@Override
protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
- if (!(desc.getName().startsWith("java.lang.") || desc.getName().startsWith("java.util.")
- || desc.getName().startsWith("org.apache.activemq."))) {
+ if (!(desc.getName().startsWith("java.lang.")
+ || desc.getName().startsWith("com.thoughtworks.xstream")
+ || desc.getName().startsWith("java.util.")
+ || desc.getName().startsWith("org.apache.activemq."))) {
throw new InvalidClassException("Unauthorized deserialization attempt", desc.getName());
}
return super.resolveClass(desc);
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
index ef71d1d..7797ae3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.activemq.broker.virtual;
import java.util.ArrayList;