This closes #564
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
index bb62541..fb46768 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
@@ -30,10 +30,11 @@
     private Collection forwardTo;
     private boolean forwardOnly = true;
     private boolean concurrentSend = false;
+    private boolean sendWhenNotMatched = false;
 
     @Override
     public Destination intercept(Destination destination) {
-        return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isConcurrentSend());
+        return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(),isSendWhenNotMatched(), isConcurrentSend());
     }
 
     @Override
@@ -192,4 +193,12 @@
 
         return true;
     }
+    
+    public boolean isSendWhenNotMatched() {
+		return sendWhenNotMatched;
+	}
+
+	public void setSendWhenNotMatched(boolean sendWhenNotMatched) {
+		this.sendWhenNotMatched = sendWhenNotMatched;
+	}
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
index 119257d..5bd5716 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
@@ -41,12 +41,14 @@
     private Collection forwardDestinations;
     private boolean forwardOnly;
     private boolean concurrentSend = false;
+    private boolean sendWhenNotMatched=false;
 
-    public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean concurrentSend) {
+    public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly,boolean sendWhenNotMatched, boolean concurrentSend) {
         super(next);
         this.forwardDestinations = forwardDestinations;
         this.forwardOnly = forwardOnly;
         this.concurrentSend = concurrentSend;
+        this.sendWhenNotMatched = sendWhenNotMatched;
     }
 
     @Override
@@ -100,9 +102,17 @@
                 doForward(context, message, brokerService.getRegionBroker(), destination);
             }
         }
-        if (!forwardOnly) {
-            super.send(context, message);
+        if(sendWhenNotMatched)
+        {
+        	if(matchingDestinations.size() <=0) {        
+        		super.send(context, message);
+        	}
+        }else {
+	        if (!forwardOnly) {
+	            super.send(context, message);
+	        }
         }
+       
         concurrent.await();
         if (exceptionAtomicReference.get() != null) {
             throw exceptionAtomicReference.get();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
new file mode 100644
index 0000000..ef71d1d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
@@ -0,0 +1,517 @@
+package org.apache.activemq.broker.virtual;
+
+import java.util.ArrayList;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.CompositeQueue;
+import org.apache.activemq.broker.region.virtual.FilteredDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.spring.ConsumerBean;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompositeDestinationSendWhenNotMatchedTest extends EmbeddedBrokerTestSupport {
+
+	private static final Logger LOG = LoggerFactory.getLogger(CompositeQueueTest.class);
+
+	protected int total = 10;
+	protected Connection connection;
+	
+
+	@Test
+	public void testSendWhenNotMatched() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+		// messageList1.waitForMessagesToArrive(0);
+		// messageList2.waitForMessagesToArrive(1);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.B");
+		Destination destination1 =new ActiveMQQueue("A.B");
+		Destination destination2 = new ActiveMQQueue("A.C");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tet13"));
+		
+		messageList1.assertMessagesArrived(1);
+		messageList2.assertMessagesArrived(0);
+	}
+
+	@Test
+	public void testSendWhenMatched() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+		// messageList1.waitForMessagesToArrive(0);
+		// messageList2.waitForMessagesToArrive(1);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.B");
+		Destination destination1 =new ActiveMQQueue("A.B");
+		Destination destination2 = new ActiveMQQueue("A.C");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "test13"));
+		
+		messageList2.assertMessagesArrived(1);
+		messageList1.assertMessagesArrived(0);
+
+	}
+	@Test
+	public void testForwardOnlyFalseSendWhenMatchedTrue1() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.D");
+		Destination destination1 =new ActiveMQQueue("A.D");
+		Destination destination2 = new ActiveMQQueue("A.E");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tes13"));
+	
+		messageList1.assertMessagesArrived(1);
+		messageList2.assertMessagesArrived(0);	
+
+	}
+	
+	public void testForwardOnlyFalseSendWhenMatchedTrue2() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.D");
+		Destination destination1 =new ActiveMQQueue("A.D");
+		Destination destination2 = new ActiveMQQueue("A.E");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "test13"));
+		Thread.sleep(1*1000);
+		messageList2.assertMessagesArrived(1);
+		messageList1.assertMessagesArrived(0);
+
+	} 
+	@Test
+	public void testForwardOnlyFalseBackwardCompatability1() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.X");
+		Destination destination1 =new ActiveMQQueue("A.X");
+		Destination destination2 = new ActiveMQQueue("A.Y");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "test13"));
+	
+		messageList2.assertMessagesArrived(1);
+		messageList1.assertMessagesArrived(1);
+
+	}
+	@Test
+	public void testForwardOnlyFalseBackwardCompatability2() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.X");
+		Destination destination1 =new ActiveMQQueue("A.X");
+		Destination destination2 = new ActiveMQQueue("A.Y");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tet13"));
+	
+		
+		messageList1.assertMessagesArrived(1);
+		messageList2.assertMessagesArrived(0);
+
+	}
+	
+	@Test
+	public void testForwardOnlyTrueBackwardCompatability1() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.W");
+		Destination destination1 =new ActiveMQQueue("A.W");
+		Destination destination2 = new ActiveMQQueue("A.V");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "test13"));
+	
+		messageList2.assertMessagesArrived(1);
+		messageList1.assertMessagesArrived(0);
+
+	}
+	@Test
+	public void testForwardOnlyTrueBackwardCompatability2() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.W");
+		Destination destination1 =new ActiveMQQueue("A.W");
+		Destination destination2 = new ActiveMQQueue("A.V");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tet13"));	
+		Thread.sleep(2*1000);
+		messageList1.assertMessagesArrived(0);
+		messageList2.assertMessagesArrived(0);
+
+	}
+	
+	@Test
+	public void testForwardOnlySendWhenNotMatchedSetToFalse() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+		// messageList1.waitForMessagesToArrive(0);
+		// messageList2.waitForMessagesToArrive(1);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("X.Y");
+		Destination destination1 =  new ActiveMQQueue("X.Y");
+		Destination destination2 =  new ActiveMQQueue("X.Z");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tet13"));
+		messageList2.assertMessagesArrived(1);
+		messageList1.assertMessagesArrived(0);
+
+	}
+	@Test
+	public void testForwardOnlyFalseSendWhenNotMatchedSetToFalse() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+		// messageList1.waitForMessagesToArrive(0);
+		// messageList2.waitForMessagesToArrive(1);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("R.S");
+		Destination destination1 =  new ActiveMQQueue("R.S");
+		Destination destination2 =  new ActiveMQQueue("R.T");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tet13"));		
+		messageList1.assertMessagesArrived(1);
+		messageList2.assertMessagesArrived(1);
+	}
+
+	protected Destination getConsumer1Dsetination() {
+		return new ActiveMQQueue("A.B");
+	}
+
+	protected Destination getConsumer2Dsetination() {
+		return new ActiveMQQueue("A.C");
+	}
+
+	protected Destination getProducerDestination() {
+		return new ActiveMQQueue("A.B");
+	}
+
+	protected TextMessage createMessage(Session session, String testid) throws JMSException {
+		TextMessage textMessage = session.createTextMessage("testMessage");
+		textMessage.setStringProperty("testid", testid);
+		return textMessage;
+	}
+
+	protected BrokerService createBroker() throws Exception {
+		BrokerService answer = new BrokerService();
+		answer.setPersistent(isPersistent());
+		answer.getManagementContext().setCreateConnector(false);
+		answer.addConnector(bindAddress);
+		/*
+		 * <destinationInterceptors> <virtualDestinationInterceptor>
+		 * <virtualDestinations> <compositeQueue name="A.B" forwardOnly="false">
+		 * <forwardTo> <filteredDestination selector="testid LIKE 'test%'" queue="A.C"/>
+		 * </forwardTo> </compositeQueue> </virtualDestinations>
+		 * </virtualDestinationInterceptor> </destinationInterceptors>
+		 */
+		/*
+		 * SendWhenNotMatched = true A message will be always forwarded to  if not matched to filtered destination 
+		 * ForwardOnly setting has no impact
+		 */
+		
+		CompositeQueue compositeQueue = new CompositeQueue();
+		compositeQueue.setName("A.B");
+		compositeQueue.setForwardOnly(true); // By default it is true
+		compositeQueue.setSendWhenNotMatched(true);// By default it is false
+		FilteredDestination filteredQueue = new FilteredDestination();
+		filteredQueue.setQueue("A.C");
+		filteredQueue.setSelector("testid LIKE 'test%'");
+		final ArrayList<Object> forwardDestinations = new ArrayList<Object>();
+		forwardDestinations.add(filteredQueue);
+		compositeQueue.setForwardTo(forwardDestinations);
+		
+	
+		CompositeQueue compositeQueue0 = new CompositeQueue();
+		compositeQueue0.setName("A.D");
+		compositeQueue0.setForwardOnly(false); // By default it is true
+		compositeQueue0.setSendWhenNotMatched(true);// By default it is false
+		FilteredDestination filteredQueue0 = new FilteredDestination();
+		filteredQueue0.setQueue("A.E");
+		filteredQueue0.setSelector("testid LIKE 'test%'");
+		final ArrayList<Object> forwardDestinations0 = new ArrayList<Object>();
+		forwardDestinations0.add(filteredQueue0);
+		compositeQueue0.setForwardTo(forwardDestinations0);
+		
+		//Back compatibility test 1
+		CompositeQueue compositeQueue01 = new CompositeQueue();
+		compositeQueue01.setName("A.X");
+		compositeQueue01.setForwardOnly(false); // By default it is true
+		//compositeQueue01.setSendWhenNotMatched(false);// By default it is false
+		FilteredDestination filteredQueue01 = new FilteredDestination();
+		filteredQueue01.setQueue("A.Y");
+		filteredQueue01.setSelector("testid LIKE 'test%'");
+		final ArrayList<Object> forwardDestinations01 = new ArrayList<Object>();
+		forwardDestinations01.add(filteredQueue01);
+		compositeQueue01.setForwardTo(forwardDestinations01);
+		
+		//Back compatibility test 2
+				CompositeQueue compositeQueue02 = new CompositeQueue();
+				compositeQueue02.setName("A.W");
+				//compositeQueue02.setForwardOnly(true); // By default it is true
+				//compositeQueue01.setSendWhenNotMatched(false);// By default it is false
+				FilteredDestination filteredQueue02 = new FilteredDestination();
+				filteredQueue02.setQueue("A.V");
+				filteredQueue02.setSelector("testid LIKE 'test%'");
+				final ArrayList<Object> forwardDestinations02 = new ArrayList<Object>();
+				forwardDestinations02.add(filteredQueue02);
+				compositeQueue02.setForwardTo(forwardDestinations02);
+		
+		CompositeQueue compositeQueue1 = new CompositeQueue();
+		compositeQueue1.setName("X.Y");
+		compositeQueue1.setForwardOnly(true); // By default it is true
+		ActiveMQQueue forwardQueue1 =new ActiveMQQueue("X.Z");	
+		final ArrayList<Object> forwardDestinations1 = new ArrayList<Object>();		
+		forwardDestinations1.add(forwardQueue1);
+		compositeQueue1.setForwardTo(forwardDestinations1);
+		
+		
+		CompositeQueue compositeQueue2 = new CompositeQueue();
+		compositeQueue2.setName("R.S");
+		compositeQueue2.setForwardOnly(false);
+		ActiveMQQueue forwardQueue2 =new ActiveMQQueue("R.T");	
+		final ArrayList<Object> forwardDestinations2 = new ArrayList<Object>();		
+		forwardDestinations2.add(forwardQueue2);
+		compositeQueue2.setForwardTo(forwardDestinations2);
+
+		VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
+		interceptor.setVirtualDestinations(new VirtualDestination[] { compositeQueue,compositeQueue0,compositeQueue01,compositeQueue02,compositeQueue1,compositeQueue2 });
+		answer.setDestinationInterceptors(new DestinationInterceptor[] { interceptor });
+		return answer;
+	}
+}