Merge pull request #611 from jbonofre/AMQ-7444

[AMQ-7444] Fallback to remote location for remote address when socket has not been created yet
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
index def4218..92dfe92 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerView.java
@@ -24,10 +24,14 @@
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
 
+import org.apache.activemq.Message;
+import org.apache.activemq.ScheduledMessage;
 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
 import org.apache.activemq.broker.scheduler.Job;
 import org.apache.activemq.broker.scheduler.JobScheduler;
 import org.apache.activemq.broker.scheduler.JobSupport;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.ByteSequence;
 
 /**
  * MBean object that can be used to manage a single instance of a JobScheduler.  The object
@@ -77,6 +81,24 @@
     }
 
     @Override
+    public int getDelayedMessageCount() throws Exception {
+        int counter = 0;
+        OpenWireFormat wireFormat = new OpenWireFormat();
+        for (Job job : jobScheduler.getAllJobs()) {
+            Message msg = (Message) wireFormat.unmarshal(new ByteSequence(job.getPayload()));
+            if (msg.getLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY) > 0) {
+                counter++;
+            }
+        }
+        return counter;
+    }
+
+    @Override
+    public int getScheduledMessageCount() throws Exception {
+        return this.jobScheduler.getAllJobs().size();
+    }
+
+    @Override
     public TabularData getNextScheduleJobs() throws Exception {
         OpenTypeFactory factory = OpenTypeSupport.getFactory(Job.class);
         CompositeType ct = factory.getCompositeType();
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
index 82a48ae..9aedbef 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/JobSchedulerViewMBean.java
@@ -122,4 +122,24 @@
     @MBeanInfo("get the scheduled Jobs in the Store within the time range. Not HTML friendly ")
     public abstract TabularData getAllJobs(@MBeanInfo("start: yyyy-MM-dd hh:mm:ss")String start,@MBeanInfo("finish: yyyy-MM-dd hh:mm:ss")String finish)throws Exception;
 
+    /**
+     * Get the number of messages in the scheduler.
+     *
+     * @return the number of messages in the scheduler.
+     *
+     * @throws Exception if an error occurs while querying the scheduler store.
+     */
+    @MBeanInfo("get the number of scheduled message (basically message in the scheduler")
+    public abstract int getScheduledMessageCount() throws Exception;
+
+    /**
+     * Get the number of delayed messages.
+     *
+     * @return the number of delayed messages.
+     *
+     * @throws Exception if an error occurs while querying the scheduler store.
+     */
+    @MBeanInfo("get the number of delayed message")
+    public abstract int getDelayedMessageCount() throws Exception;
+
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 6f654a4..41c6a48 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -116,7 +116,11 @@
                 if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) {
                     // Slow consumers should log and set their state as such.
                     if (!isSlowConsumer()) {
-                        LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString());
+                        String remoteAddr = null;
+                        if (context != null && context.getConnection() != null) {
+                            remoteAddr = context.getConnection().getRemoteAddress();
+                        }
+                        LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow{}", toString(), (remoteAddr != null) ? ": " + remoteAddr : "");
                         setSlowConsumer(true);
                         for (Destination dest: destinations) {
                             dest.slowConsumer(getContext(), this);
@@ -133,7 +137,7 @@
                                 return;
                             }
                             if (!warnedAboutWait) {
-                                LOG.info("{}: Pending message cursor [{}] is full, temp usag ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.",
+                                LOG.info("{}: Pending message cursor [{}] is full, temp usage ({}%) or memory usage ({}%) limit reached, blocking message add() pending the release of resources.",
                                         new Object[]{
                                                 toString(),
                                                 matched,
diff --git a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
index 7a0e58c..47d4754 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/plugin/SubQueueSelectorCacheBroker.java
@@ -369,7 +369,10 @@
 
         @Override
         protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
-            if (!(desc.getName().equals("java.lang.String") || desc.getName().startsWith("java.util."))) {
+            if (!(desc.getName().startsWith("java.lang.")
+                    || desc.getName().startsWith("com.thoughtworks.xstream")
+                    || desc.getName().startsWith("java.util.")
+                    || desc.getName().startsWith("org.apache.activemq."))) {
                 throw new InvalidClassException("Unauthorized deserialization attempt", desc.getName());
             }
             return super.resolveClass(desc);
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java b/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
index 0a717f4..396b650 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
@@ -40,7 +40,7 @@
     private final ClassLoader inLoader;
 
     static {
-        serializablePackages = System.getProperty("org.apache.activemq.SERIALIZABLE_PACKAGES","org.apache.activemq,org.fusesource.hawtbuf,com.thoughtworks.xstream.mapper").split(",");
+        serializablePackages = System.getProperty("org.apache.activemq.SERIALIZABLE_PACKAGES","java.lang,org.apache.activemq,org.fusesource.hawtbuf,com.thoughtworks.xstream.mapper").split(",");
     }
 
     public ClassLoadingAwareObjectInputStream(InputStream in) throws IOException {
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index e30f3bf..6e0688b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -4254,8 +4254,10 @@
 
         @Override
         protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
-            if (!(desc.getName().startsWith("java.lang.") || desc.getName().startsWith("java.util.")
-                || desc.getName().startsWith("org.apache.activemq."))) {
+            if (!(desc.getName().startsWith("java.lang.")
+                    || desc.getName().startsWith("com.thoughtworks.xstream")
+                    || desc.getName().startsWith("java.util.")
+                    || desc.getName().startsWith("org.apache.activemq."))) {
                 throw new InvalidClassException("Unauthorized deserialization attempt", desc.getName());
             }
             return super.resolveClass(desc);
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
index ef71d1d..7797ae3 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.activemq.broker.virtual;
 
 import java.util.ArrayList;