blob: 87fc543dbe885bd0af227cec54942e90bb0d6d79 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.example.publisher;
import org.apache.qpid.client.AMQConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.DeliveryMode;
import javax.jms.Queue;
import javax.jms.MessageProducer;
import javax.jms.Connection;
import javax.jms.Session;
import javax.naming.InitialContext;
import org.apache.qpid.example.shared.InitialContextHelper;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
public class Publisher
{
private static final Logger _log = LoggerFactory.getLogger(Publisher.class);
protected InitialContextHelper _contextHelper;
protected Connection _connection;
protected Session _session;
protected MessageProducer _producer;
protected String _destinationDir;
protected String _name = "Publisher";
protected Queue _destination;
protected static final String _defaultDestinationDir = "/tmp";
/**
* Creates a Publisher instance using properties from example.properties
* See InitialContextHelper for details of how context etc created
*/
public Publisher()
{
try
{
//get an initial context from default properties
_contextHelper = new InitialContextHelper(null);
InitialContext ctx = _contextHelper.getInitialContext();
//then create a connection using the AMQConnectionFactory
AMQConnectionFactory cf = (AMQConnectionFactory) ctx.lookup("local");
_connection = cf.createConnection();
//create a transactional session
_session = _connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//lookup the example queue and use it
//Queue is non-exclusive and not deleted when last consumer detaches
_destination = (Queue) ctx.lookup("MyQueue");
//create a message producer
_producer = _session.createProducer(_destination);
//set destination dir for files that have been processed
_destinationDir = _defaultDestinationDir;
_connection.start();
}
catch (Exception e)
{
e.printStackTrace();
_log.error("Exception", e);
}
}
/**
* Publishes a non-persistent message using transacted session
* Note that persistent is the default mode for send - so need to specify for transient
*/
public boolean sendMessage(Message message)
{
try
{
//Send message via our producer which is not persistent
_producer.send(message, DeliveryMode.NON_PERSISTENT, _producer.getPriority(), _producer.getTimeToLive());
//commit the message send and close the transaction
_session.commit();
}
catch (JMSException e)
{
//Have to assume our commit failed and rollback here
try
{
_session.rollback();
_log.error("JMSException", e);
e.printStackTrace();
return false;
}
catch (JMSException j)
{
_log.error("Unable to rollback publish transaction ",e);
return false;
}
}
_log.info(_name + " finished sending message: " + message);
return true;
}
/**
* Cleanup resources before exit
*/
public void cleanup()
{
try
{
if (_connection != null)
{
_connection.stop();
_connection.close();
}
_connection = null;
_producer = null;
}
catch(Exception e)
{
_log.error("Error trying to cleanup publisher " + e);
System.exit(1);
}
}
/**
* Exposes session
* @return Session
*/
public Session getSession()
{
return _session;
}
public String getDestinationDir()
{
return _destinationDir;
}
public void setDestinationDir(String destinationDir)
{
_destinationDir = destinationDir;
}
public String getName()
{
return _name;
}
public void setName(String _name) {
this._name = _name;
}
}