blob: e4eb5ac7f5ce2e3b5a0f993d4b7d3139fc1ee792 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.example.subscriber;
import org.apache.qpid.example.shared.Statics;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import javax.jms.*;
/**
* Subclass of Subscriber which consumes a heartbeat message
*/
public class MonitoredSubscriber extends Subscriber
{
protected String _monitorDestinationName;
private static final Logger _logger = LoggerFactory.getLogger(MonitoredSubscriber.class);
private MessageConsumer _monitorConsumer;
public MonitoredSubscriber()
{
super();
//lookup queue name and append suffix
_monitorDestinationName = _destination.toString() + Statics.MONITOR_QUEUE_SUFFIX;
}
/**
* MessageListener implementation for this subscriber
*/
public static class MonitorMessageListener implements MessageListener
{
private String _name;
public MonitorMessageListener(String name)
{
_name = name;
}
/**
* Listens for heartbeat messages and acknowledges them
* @param message
*/
public void onMessage(javax.jms.Message message)
{
_logger.info(_name + " monitor got message '" + message + "'");
try
{
_logger.debug("Monitor acknowledging recieved message");
//Now acknowledge the message to clear it from our queue
message.acknowledge();
}
catch(JMSException j)
{
_logger.error("Monitor caught JMSException trying to acknowledge message receipt");
j.printStackTrace();
}
catch(Exception e)
{
_logger.error("Monitor caught unexpected exception trying to handle message");
e.printStackTrace();
}
}
}
/**
* Subscribes to Queue and attaches additional monitor listener
*/
public void subscribeAndMonitor()
{
try
{
_connection = _connectionFactory.createConnection();
//create a transactional session
Session session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//Queue is non-exclusive and not deleted when last consumer detaches
Destination destination = session.createQueue(_monitorDestinationName);
//Create a consumer with a destination of our queue which will use defaults for prefetch etc
_monitorConsumer = session.createConsumer(destination);
//give the monitor message listener a name of it's own
_monitorConsumer.setMessageListener(new MonitoredSubscriber.MonitorMessageListener
("MonitorListener " + System.currentTimeMillis()));
MonitoredSubscriber._logger.info("Starting monitored subscription ...");
_connection.start();
//and now start ordinary consumption too
subscribe();
}
catch (Throwable t)
{
_logger.error("Fatal error: " + t);
t.printStackTrace();
}
}
/**
* Stop consuming
*/
public void stopMonitor()
{
try
{
_monitorConsumer.close();
_monitorConsumer = null;
stop();
}
catch(JMSException j)
{
_logger.error("JMSException trying to Subscriber.stop: " + j.getStackTrace());
}
}
}