blob: 743a736884a7d8554628d208931df2fc67ac072c [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import java.io.File;
import java.io.ByteArrayInputStream;
import java.sql.DriverManager;
import java.sql.Driver;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Blob;
import java.sql.Types;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.TreeMap;
public class DerbyMessageStore implements MessageStore
{
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
private static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE";
private static final String QUEUE_TABLE_NAME = "QPID_QUEUE";
private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS";
private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY";
private static final String MESSAGE_META_DATA_TABLE_NAME = "QPID_MESSAGE_META_DATA";
private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
private static final int DB_VERSION = 1;
private VirtualHost _virtualHost;
private static Class<Driver> DRIVER_CLASS;
private final AtomicLong _messageId = new AtomicLong(1);
private AtomicBoolean _closed = new AtomicBoolean(false);
private String _connectionURL;
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), PRIMARY KEY ( name ) )";
private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
private static final String CREATE_MESSAGE_META_DATA_TABLE = "CREATE TABLE "+MESSAGE_META_DATA_TABLE_NAME+" ( message_id bigint not null, exchange_name varchar(255) not null, routing_key varchar(255), flag_mandatory smallint not null, flag_immediate smallint not null, content_header blob, chunk_count int not null, PRIMARY KEY ( message_id ) )";
private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, chunk_id int not null, content_chunk blob , PRIMARY KEY (message_id, chunk_id) )";
private static final String SELECT_FROM_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME;
private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
private static final String SELECT_FROM_BINDINGS =
"SELECT queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ?";
private static final String DELETE_FROM_MESSAGE_META_DATA = "DELETE FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )";
private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?";
private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner) VALUES (?, ?)";
private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)";
private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?";
private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, chunk_id, content_chunk ) values (?, ?, ?)";
private static final String INSERT_INTO_MESSAGE_META_DATA = "INSERT INTO " + MESSAGE_META_DATA_TABLE_NAME + "( message_id , exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count ) values (?, ?, ?, ?, ?, ?, ?)";
private static final String SELECT_FROM_MESSAGE_META_DATA =
"SELECT exchange_name , routing_key , flag_mandatory , flag_immediate , content_header , chunk_count FROM " + MESSAGE_META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String SELECT_FROM_MESSAGE_CONTENT =
"SELECT content_chunk FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? and chunk_id = ?";
private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME;
private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
private enum State
{
INITIAL,
CONFIGURING,
RECOVERING,
STARTED,
CLOSING,
CLOSED
}
private State _state = State.INITIAL;
public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
{
stateTransition(State.INITIAL, State.CONFIGURING);
initialiseDriver();
_virtualHost = virtualHost;
_logger.info("Configuring Derby message store for virtual host " + virtualHost.getName());
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
final String databasePath = config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "derbyDB");
File environmentPath = new File(databasePath);
if (!environmentPath.exists())
{
if (!environmentPath.mkdirs())
{
throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+ "Ensure the path is correct and that the permissions are correct.");
}
}
createOrOpenDatabase(databasePath);
// this recovers durable queues and persistent messages
recover();
stateTransition(State.RECOVERING, State.STARTED);
}
private static synchronized void initialiseDriver() throws ClassNotFoundException
{
if(DRIVER_CLASS == null)
{
DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
}
}
private void createOrOpenDatabase(final String environmentPath) throws SQLException
{
_connectionURL = "jdbc:derby:" + environmentPath + "/" + _virtualHost.getName() + ";create=true";
Connection conn = newConnection();
createVersionTable(conn);
createExchangeTable(conn);
createQueueTable(conn);
createBindingsTable(conn);
createQueueEntryTable(conn);
createMessageMetaDataTable(conn);
createMessageContentTable(conn);
conn.close();
}
private void createVersionTable(final Connection conn) throws SQLException
{
if(!tableExists(DB_VERSION_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
stmt.execute(CREATE_DB_VERSION_TABLE);
stmt.close();
PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION);
pstmt.setInt(1, DB_VERSION);
pstmt.execute();
pstmt.close();
}
}
private void createExchangeTable(final Connection conn) throws SQLException
{
if(!tableExists(EXCHANGE_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
stmt.execute(CREATE_EXCHANGE_TABLE);
stmt.close();
}
}
private void createQueueTable(final Connection conn) throws SQLException
{
if(!tableExists(QUEUE_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
stmt.execute(CREATE_QUEUE_TABLE);
stmt.close();
}
}
private void createBindingsTable(final Connection conn) throws SQLException
{
if(!tableExists(BINDINGS_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
stmt.execute(CREATE_BINDINGS_TABLE);
stmt.close();
}
}
private void createQueueEntryTable(final Connection conn) throws SQLException
{
if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
stmt.execute(CREATE_QUEUE_ENTRY_TABLE);
stmt.close();
}
}
private void createMessageMetaDataTable(final Connection conn) throws SQLException
{
if(!tableExists(MESSAGE_META_DATA_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
stmt.execute(CREATE_MESSAGE_META_DATA_TABLE);
stmt.close();
}
}
private void createMessageContentTable(final Connection conn) throws SQLException
{
if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
stmt.execute(CREATE_MESSAGE_CONTENT_TABLE);
stmt.close();
}
}
private boolean tableExists(final String tableName, final Connection conn) throws SQLException
{
PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
stmt.setString(1, tableName);
ResultSet rs = stmt.executeQuery();
boolean exists = rs.next();
rs.close();
stmt.close();
return exists;
}
public void recover() throws AMQException
{
stateTransition(State.CONFIGURING, State.RECOVERING);
_logger.info("Recovering persistent state...");
StoreContext context = new StoreContext();
try
{
Map<AMQShortString, AMQQueue> queues = loadQueues();
recoverExchanges();
try
{
beginTran(context);
deliverMessages(context, queues);
_logger.info("Persistent state recovered successfully");
commitTran(context);
}
finally
{
if(inTran(context))
{
abortTran(context);
}
}
}
catch (SQLException e)
{
throw new AMQException("Error recovering persistent state: " + e, e);
}
}
private Map<AMQShortString, AMQQueue> loadQueues() throws SQLException, AMQException
{
Connection conn = newConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
Map<AMQShortString, AMQQueue> queueMap = new HashMap<AMQShortString, AMQQueue>();
while(rs.next())
{
String queueName = rs.getString(1);
String owner = rs.getString(2);
AMQShortString queueNameShortString = new AMQShortString(queueName);
AMQQueue q = AMQQueueFactory.createAMQQueueImpl(queueNameShortString, true, owner == null ? null : new AMQShortString(owner), false, _virtualHost,
null);
_virtualHost.getQueueRegistry().registerQueue(q);
queueMap.put(queueNameShortString,q);
}
return queueMap;
}
private void recoverExchanges() throws AMQException, SQLException
{
for (Exchange exchange : loadExchanges())
{
recoverExchange(exchange);
}
}
private List<Exchange> loadExchanges() throws AMQException, SQLException
{
List<Exchange> exchanges = new ArrayList<Exchange>();
Connection conn = null;
try
{
conn = newConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
Exchange exchange;
while(rs.next())
{
String exchangeName = rs.getString(1);
String type = rs.getString(2);
boolean autoDelete = rs.getShort(3) != 0;
exchange = _virtualHost.getExchangeFactory().createExchange(new AMQShortString(exchangeName), new AMQShortString(type), true, autoDelete, 0);
_virtualHost.getExchangeRegistry().registerExchange(exchange);
exchanges.add(exchange);
}
return exchanges;
}
finally
{
if(conn != null)
{
conn.close();
}
}
}
private void recoverExchange(Exchange exchange) throws AMQException, SQLException
{
_logger.info("Recovering durable exchange " + exchange.getName() + " of type " + exchange.getType() + "...");
QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
Connection conn = null;
try
{
conn = newConnection();
PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
stmt.setString(1, exchange.getName().toString());
ResultSet rs = stmt.executeQuery();
while(rs.next())
{
String queueName = rs.getString(1);
String bindingKey = rs.getString(2);
Blob arguments = rs.getBlob(3);
AMQQueue queue = queueRegistry.getQueue(new AMQShortString(queueName));
if (queue == null)
{
_logger.error("Unkown queue: " + queueName + " cannot be bound to exchange: "
+ exchange.getName());
}
else
{
_logger.info("Restoring binding: (Exchange: " + exchange.getName() + ", Queue: " + queueName
+ ", Routing Key: " + bindingKey + ", Arguments: " + arguments
+ ")");
FieldTable argumentsFT = null;
if(arguments != null)
{
byte[] argumentBytes = arguments.getBytes(0, (int) arguments.length());
ByteBuffer buf = ByteBuffer.wrap(argumentBytes);
argumentsFT = new FieldTable(buf,arguments.length());
}
queue.bind(exchange, bindingKey == null ? null : new AMQShortString(bindingKey), argumentsFT);
}
}
}
finally
{
if(conn != null)
{
conn.close();
}
}
}
public void close() throws Exception
{
_closed.getAndSet(true);
}
public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
{
boolean localTx = getOrCreateTransaction(storeContext);
Connection conn = getConnection(storeContext);
ConnectionWrapper wrapper = (ConnectionWrapper) storeContext.getPayload();
if (_logger.isDebugEnabled())
{
_logger.debug("Message Id: " + messageId + " Removing");
}
// first we need to look up the header to get the chunk count
MessageMetaData mmd = getMessageMetaData(storeContext, messageId);
try
{
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_META_DATA);
stmt.setLong(1,messageId);
wrapper.setRequiresCommit();
int results = stmt.executeUpdate();
if (results == 0)
{
if (localTx)
{
abortTran(storeContext);
}
throw new AMQException("Message metadata not found for message id " + messageId);
}
stmt.close();
if (_logger.isDebugEnabled())
{
_logger.debug("Deleted metadata for message " + messageId);
}
stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
results = stmt.executeUpdate();
if(results != mmd.getContentChunkCount())
{
if (localTx)
{
abortTran(storeContext);
}
throw new AMQException("Unexpected number of content chunks when deleting message. Expected " + mmd.getContentChunkCount() + " but found " + results);
}
if (localTx)
{
commitTran(storeContext);
}
}
catch (SQLException e)
{
if ((conn != null) && localTx)
{
abortTran(storeContext);
}
throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
}
}
public void createExchange(Exchange exchange) throws AMQException
{
if (_state != State.RECOVERING)
{
try
{
Connection conn = null;
try
{
conn = newConnection();
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_EXCHANGE);
stmt.setString(1, exchange.getName().toString());
stmt.setString(2, exchange.getType().toString());
stmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0);
stmt.execute();
stmt.close();
conn.commit();
}
finally
{
if(conn != null)
{
conn.close();
}
}
}
catch (SQLException e)
{
throw new AMQException("Error writing Exchange with name " + exchange.getName() + " to database: " + e, e);
}
}
}
public void removeExchange(Exchange exchange) throws AMQException
{
Connection conn = null;
try
{
conn = newConnection();
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE);
stmt.setString(1, exchange.getName().toString());
int results = stmt.executeUpdate();
if(results == 0)
{
throw new AMQException("Exchange " + exchange.getName() + " not found");
}
else
{
conn.commit();
stmt.close();
}
}
catch (SQLException e)
{
throw new AMQException("Error writing deleting with name " + exchange.getName() + " from database: " + e, e);
}
finally
{
if(conn != null)
{
try
{
conn.close();
}
catch (SQLException e)
{
_logger.error(e);
}
}
}
}
public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
throws AMQException
{
if (_state != State.RECOVERING)
{
Connection conn = null;
try
{
conn = newConnection();
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
stmt.setString(1, exchange.getName().toString() );
stmt.setString(2, queue.getName().toString());
stmt.setString(3, routingKey == null ? null : routingKey.toString());
if(args != null)
{
/* This would be the Java 6 way of setting a Blob
Blob blobArgs = conn.createBlob();
blobArgs.setBytes(0, args.getDataAsBytes());
stmt.setBlob(4, blobArgs);
*/
byte[] bytes = args.getDataAsBytes();
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
stmt.setBinaryStream(4, bis, bytes.length);
}
else
{
stmt.setNull(4, Types.BLOB);
}
stmt.executeUpdate();
conn.commit();
stmt.close();
}
catch (SQLException e)
{
throw new AMQException("Error writing binding for AMQQueue with name " + queue.getName() + " to exchange "
+ exchange.getName() + " to database: " + e, e);
}
finally
{
if(conn != null)
{
try
{
conn.close();
}
catch (SQLException e)
{
_logger.error(e);
}
}
}
}
}
public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
throws AMQException
{
Connection conn = null;
try
{
conn = newConnection();
// exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
stmt.setString(1, exchange.getName().toString() );
stmt.setString(2, queue.getName().toString());
stmt.setString(3, routingKey == null ? null : routingKey.toString());
if(stmt.executeUpdate() != 1)
{
throw new AMQException("Queue binding for queue with name " + queue.getName() + " to exchange "
+ exchange.getName() + " not found");
}
conn.commit();
stmt.close();
}
catch (SQLException e)
{
throw new AMQException("Error removing binding for AMQQueue with name " + queue.getName() + " to exchange "
+ exchange.getName() + " in database: " + e, e);
}
finally
{
if(conn != null)
{
try
{
conn.close();
}
catch (SQLException e)
{
_logger.error(e);
}
}
}
}
public void createQueue(AMQQueue queue) throws AMQException
{
createQueue(queue, null);
}
public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
{
_logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
if (_state != State.RECOVERING)
{
try
{
Connection conn = newConnection();
PreparedStatement stmt =
conn.prepareStatement(INSERT_INTO_QUEUE);
stmt.setString(1, queue.getName().toString());
stmt.setString(2, queue.getOwner() == null ? null : queue.getOwner().toString());
stmt.execute();
stmt.close();
conn.commit();
conn.close();
}
catch (SQLException e)
{
throw new AMQException("Error writing AMQQueue with name " + queue.getName() + " to database: " + e, e);
}
}
}
private Connection newConnection() throws SQLException
{
final Connection connection = DriverManager.getConnection(_connectionURL);
return connection;
}
public void removeQueue(final AMQQueue queue) throws AMQException
{
AMQShortString name = queue.getName();
_logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
Connection conn = null;
try
{
conn = newConnection();
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE);
stmt.setString(1, name.toString());
int results = stmt.executeUpdate();
if (results == 0)
{
throw new AMQException("Queue " + name + " not found");
}
conn.commit();
stmt.close();
}
catch (SQLException e)
{
throw new AMQException("Error writing deleting with name " + name + " from database: " + e, e);
}
finally
{
if(conn != null)
{
try
{
conn.close();
}
catch (SQLException e)
{
_logger.error(e);
}
}
}
}
public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
AMQShortString name = queue.getName();
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
try
{
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
stmt.setString(1,name.toString());
stmt.setLong(2,messageId);
stmt.executeUpdate();
connWrapper.requiresCommit();
if(localTx)
{
commitTran(context);
}
if (_logger.isDebugEnabled())
{
_logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
}
}
catch (SQLException e)
{
if(localTx)
{
abortTran(context);
}
_logger.error("Failed to enqueue: " + e, e);
throw new AMQException("Error writing enqueued message with id " + messageId + " for queue " + name
+ " to database", e);
}
}
public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
AMQShortString name = queue.getName();
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
try
{
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
stmt.setString(1,name.toString());
stmt.setLong(2,messageId);
int results = stmt.executeUpdate();
connWrapper.requiresCommit();
if(results != 1)
{
throw new AMQException("Unable to find message with id " + messageId + " on queue " + name);
}
if(localTx)
{
commitTran(context);
}
if (_logger.isDebugEnabled())
{
_logger.debug("Dequeuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
}
}
catch (SQLException e)
{
if(localTx)
{
abortTran(context);
}
_logger.error("Failed to dequeue: " + e, e);
throw new AMQException("Error deleting enqueued message with id " + messageId + " for queue " + name
+ " from database", e);
}
}
private static final class ConnectionWrapper
{
private final Connection _connection;
private boolean _requiresCommit;
public ConnectionWrapper(Connection conn)
{
_connection = conn;
}
public void setRequiresCommit()
{
_requiresCommit = true;
}
public boolean requiresCommit()
{
return _requiresCommit;
}
public Connection getConnection()
{
return _connection;
}
}
public void beginTran(StoreContext context) throws AMQException
{
if (context.getPayload() != null)
{
throw new AMQException("Fatal internal error: transactional context is not empty at beginTran: "
+ context.getPayload());
}
else
{
try
{
Connection conn = newConnection();
context.setPayload(new ConnectionWrapper(conn));
}
catch (SQLException e)
{
throw new AMQException("Error starting transaction: " + e, e);
}
}
}
public void commitTran(StoreContext context) throws AMQException
{
ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
if (connWrapper == null)
{
throw new AMQException("Fatal internal error: transactional context is empty at commitTran");
}
try
{
Connection conn = connWrapper.getConnection();
if(connWrapper.requiresCommit())
{
conn.commit();
if (_logger.isDebugEnabled())
{
_logger.debug("commit tran completed");
}
}
conn.close();
}
catch (SQLException e)
{
throw new AMQException("Error commit tx: " + e, e);
}
finally
{
context.setPayload(null);
}
}
public void abortTran(StoreContext context) throws AMQException
{
ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
if (connWrapper == null)
{
throw new AMQException("Fatal internal error: transactional context is empty at abortTran");
}
if (_logger.isDebugEnabled())
{
_logger.debug("abort tran called: " + connWrapper.getConnection());
}
try
{
Connection conn = connWrapper.getConnection();
if(connWrapper.requiresCommit())
{
conn.rollback();
}
conn.close();
}
catch (SQLException e)
{
throw new AMQException("Error aborting transaction: " + e, e);
}
finally
{
context.setPayload(null);
}
}
public boolean inTran(StoreContext context)
{
return context.getPayload() != null;
}
public Long getNewMessageId()
{
return _messageId.getAndIncrement();
}
public void storeContentBodyChunk(StoreContext context,
Long messageId,
int index,
ContentChunk contentBody,
boolean lastContentBody) throws AMQException
{
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
try
{
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
stmt.setInt(2, index);
byte[] chunkData = new byte[contentBody.getSize()];
contentBody.getData().duplicate().get(chunkData);
/* this would be the Java 6 way of doing things
Blob dataAsBlob = conn.createBlob();
dataAsBlob.setBytes(1L, chunkData);
stmt.setBlob(3, dataAsBlob);
*/
ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
stmt.setBinaryStream(3, bis, chunkData.length);
stmt.executeUpdate();
connWrapper.requiresCommit();
if(localTx)
{
commitTran(context);
}
}
catch (SQLException e)
{
if(localTx)
{
abortTran(context);
}
throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
}
}
public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData mmd)
throws AMQException
{
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
ConnectionWrapper connWrapper = (ConnectionWrapper) context.getPayload();
try
{
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_META_DATA);
stmt.setLong(1,messageId);
stmt.setString(2, mmd.getMessagePublishInfo().getExchange().toString());
stmt.setString(3, mmd.getMessagePublishInfo().getRoutingKey().toString());
stmt.setShort(4, mmd.getMessagePublishInfo().isMandatory() ? (short) 1 : (short) 0);
stmt.setShort(5, mmd.getMessagePublishInfo().isImmediate() ? (short) 1 : (short) 0);
ContentHeaderBody headerBody = mmd.getContentHeaderBody();
final int bodySize = headerBody.getSize();
byte[] underlying = new byte[bodySize];
ByteBuffer buf = ByteBuffer.wrap(underlying);
headerBody.writePayload(buf);
/*
Blob dataAsBlob = conn.createBlob();
dataAsBlob.setBytes(1L, underlying);
stmt.setBlob(6, dataAsBlob);
*/
ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
stmt.setBinaryStream(6,bis,underlying.length);
stmt.setInt(7, mmd.getContentChunkCount());
stmt.executeUpdate();
connWrapper.requiresCommit();
if(localTx)
{
commitTran(context);
}
}
catch (SQLException e)
{
if(localTx)
{
abortTran(context);
}
throw new AMQException("Error writing AMQMessage with id " + messageId + " to database: " + e, e);
}
}
public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
{
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
try
{
PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_META_DATA);
stmt.setLong(1,messageId);
ResultSet rs = stmt.executeQuery();
if(rs.next())
{
final AMQShortString exchange = new AMQShortString(rs.getString(1));
final AMQShortString routingKey = rs.getString(2) == null ? null : new AMQShortString(rs.getString(2));
final boolean mandatory = (rs.getShort(3) != (short)0);
final boolean immediate = (rs.getShort(4) != (short)0);
MessagePublishInfo info = new MessagePublishInfo()
{
public AMQShortString getExchange()
{
return exchange;
}
public void setExchange(AMQShortString exchange)
{
}
public boolean isImmediate()
{
return immediate;
}
public boolean isMandatory()
{
return mandatory;
}
public AMQShortString getRoutingKey()
{
return routingKey;
}
} ;
Blob dataAsBlob = rs.getBlob(5);
byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
ContentHeaderBody chb = ContentHeaderBody.createFromBuffer(buf, dataAsBytes.length);
if(localTx)
{
commitTran(context);
}
return new MessageMetaData(info, chb, rs.getInt(6));
}
else
{
if(localTx)
{
abortTran(context);
}
throw new AMQException("Metadata not found for message with id " + messageId);
}
}
catch (SQLException e)
{
if(localTx)
{
abortTran(context);
}
throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
}
}
public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
{
boolean localTx = getOrCreateTransaction(context);
Connection conn = getConnection(context);
try
{
PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
stmt.setInt(2, index);
ResultSet rs = stmt.executeQuery();
if(rs.next())
{
Blob dataAsBlob = rs.getBlob(1);
final int size = (int) dataAsBlob.length();
byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
final ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
ContentChunk cb = new ContentChunk()
{
public int getSize()
{
return size;
}
public ByteBuffer getData()
{
return buf;
}
public void reduceToFit()
{
}
};
if(localTx)
{
commitTran(context);
}
return cb;
}
else
{
if(localTx)
{
abortTran(context);
}
throw new AMQException("Message not found for message with id " + messageId);
}
}
catch (SQLException e)
{
if(localTx)
{
abortTran(context);
}
throw new AMQException("Error reading AMQMessage with id " + messageId + " from database: " + e, e);
}
}
public boolean isPersistent()
{
return true;
}
private void checkNotClosed() throws MessageStoreClosedException
{
if (_closed.get())
{
throw new MessageStoreClosedException();
}
}
private static final class ProcessAction
{
private final AMQQueue _queue;
private final StoreContext _context;
private final AMQMessage _message;
public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message)
{
_queue = queue;
_context = context;
_message = message;
}
public void process() throws AMQException
{
_queue.enqueue(_context, _message);
}
}
private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
throws SQLException, AMQException
{
Map<Long, AMQMessage> msgMap = new HashMap<Long,AMQMessage>();
List<ProcessAction> actions = new ArrayList<ProcessAction>();
Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
final boolean inLocaltran = inTran(context);
Connection conn = null;
try
{
if(inLocaltran)
{
conn = getConnection(context);
}
else
{
conn = newConnection();
}
MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
long maxId = 1;
TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
while (rs.next())
{
AMQShortString queueName = new AMQShortString(rs.getString(1));
AMQQueue queue = queues.get(queueName);
if (queue == null)
{
queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
_virtualHost.getQueueRegistry().registerQueue(queue);
queues.put(queueName, queue);
}
long messageId = rs.getLong(2);
maxId = Math.max(maxId, messageId);
AMQMessage message = msgMap.get(messageId);
if(message != null)
{
message.incrementReference();
}
else
{
message = new AMQMessage(messageId, this, messageHandleFactory, txnContext);
msgMap.put(messageId,message);
}
if (_logger.isDebugEnabled())
{
_logger.debug("On recovery, delivering " + message.getMessageId() + " to " + queue.getName());
}
if (_logger.isInfoEnabled())
{
Integer count = queueRecoveries.get(queueName);
if (count == null)
{
count = 0;
}
queueRecoveries.put(queueName, ++count);
}
actions.add(new ProcessAction(queue, context, message));
}
for(ProcessAction action : actions)
{
action.process();
}
_messageId.set(maxId + 1);
}
catch (SQLException e)
{
_logger.error("Error: " + e, e);
throw e;
}
finally
{
if (inLocaltran && conn != null)
{
conn.close();
}
}
if (_logger.isInfoEnabled())
{
_logger.info("Recovered message counts: " + queueRecoveries);
}
}
private Connection getConnection(final StoreContext context)
{
return ((ConnectionWrapper)context.getPayload()).getConnection();
}
private boolean getOrCreateTransaction(StoreContext context) throws AMQException
{
ConnectionWrapper tx = (ConnectionWrapper) context.getPayload();
if (tx == null)
{
beginTran(context);
return true;
}
return false;
}
private synchronized void stateTransition(State requiredState, State newState) throws AMQException
{
if (_state != requiredState)
{
throw new AMQException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
+ "; currently in state: " + _state);
}
_state = newState;
}
}