blob: 4d63136a9d8bc3ce94bd0d25d9a205a9bc1f927d [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.federation.Bridge;
import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import org.apache.qpid.server.logging.messages.TransactionLogMessages;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* An implementation of a {@link MessageStore} that uses Apache Derby as the persistance
* mechanism.
*
* TODO extract the SQL statements into a generic JDBC store
*/
public class DerbyMessageStore implements MessageStore, DurableConfigurationStore
{
private static final Logger _logger = Logger.getLogger(DerbyMessageStore.class);
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
private static final String SQL_DRIVER_NAME = "org.apache.derby.jdbc.EmbeddedDriver";
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE";
private static final String QUEUE_TABLE_NAME = "QPID_QUEUE";
private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS";
private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY";
private static final String META_DATA_TABLE_NAME = "QPID_META_DATA";
private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
private static final String LINKS_TABLE_NAME = "QPID_LINKS";
private static final String BRIDGES_TABLE_NAME = "QPID_BRIDGES";
private static final String XID_TABLE_NAME = "QPID_XIDS";
private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
private static final int DB_VERSION = 3;
private static Class<Driver> DRIVER_CLASS;
private final AtomicLong _messageId = new AtomicLong(0);
private AtomicBoolean _closed = new AtomicBoolean(false);
private String _connectionURL;
private static final String TABLE_EXISTANCE_QUERY = "SELECT 1 FROM SYS.SYSTABLES WHERE TABLENAME = ?";
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), exclusive SMALLINT not null, arguments blob, PRIMARY KEY ( name ))";
private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
private static final String SELECT_FROM_QUEUE = "SELECT name, owner, exclusive, arguments FROM " + QUEUE_TABLE_NAME;
private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
private static final String UPDATE_QUEUE_EXCLUSIVITY = "UPDATE " + QUEUE_TABLE_NAME + " SET exclusive = ? WHERE name = ?";
private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
private static final String SELECT_FROM_BINDINGS =
"SELECT exchange_name, queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " ORDER BY exchange_name";
private static final String FIND_BINDING =
"SELECT * FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ? ";
private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
private static final String FIND_EXCHANGE = "SELECT name FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )";
private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?";
private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner, exclusive, arguments) VALUES (?, ?, ?, ?)";
private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)";
private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?";
private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_name, message_id";
private static final String CREATE_META_DATA_TABLE = "CREATE TABLE "+META_DATA_TABLE_NAME+" ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )";
private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, offset int not null, last_byte int not null, content blob , PRIMARY KEY (message_id, offset) )";
private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, offset, last_byte, content ) values (?, ?, ?, ?)";
private static final String SELECT_FROM_MESSAGE_CONTENT =
"SELECT offset, content FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? AND last_byte > ? AND offset < ? ORDER BY message_id, offset";
private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";;
private static final String SELECT_FROM_META_DATA =
"SELECT meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
private static final String CREATE_LINKS_TABLE =
"CREATE TABLE "+LINKS_TABLE_NAME+" ( id_lsb bigint not null,"
+ " id_msb bigint not null,"
+ " create_time bigint not null,"
+ " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))";
private static final String SELECT_FROM_LINKS =
"SELECT create_time, arguments FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and id_msb";
private static final String DELETE_FROM_LINKS = "DELETE FROM " + LINKS_TABLE_NAME
+ " WHERE id_lsb = ? and id_msb = ?";
private static final String SELECT_ALL_FROM_LINKS = "SELECT id_lsb, id_msb, create_time, "
+ "arguments FROM " + LINKS_TABLE_NAME;
private static final String FIND_LINK = "SELECT id_lsb, id_msb FROM " + LINKS_TABLE_NAME + " WHERE id_lsb = ? and"
+ " id_msb = ?";
private static final String INSERT_INTO_LINKS = "INSERT INTO " + LINKS_TABLE_NAME + "( id_lsb, "
+ "id_msb, create_time, arguments ) values (?, ?, ?, ?)";
private static final String CREATE_BRIDGES_TABLE =
"CREATE TABLE "+BRIDGES_TABLE_NAME+" ( id_lsb bigint not null,"
+ " id_msb bigint not null,"
+ " create_time bigint not null,"
+ " link_id_lsb bigint not null,"
+ " link_id_msb bigint not null,"
+ " arguments blob, PRIMARY KEY ( id_lsb, id_msb ))";
private static final String SELECT_FROM_BRIDGES =
"SELECT create_time, link_id_lsb, link_id_msb, arguments FROM "
+ BRIDGES_TABLE_NAME + " WHERE id_lsb = ? and id_msb = ?";
private static final String DELETE_FROM_BRIDGES = "DELETE FROM " + BRIDGES_TABLE_NAME
+ " WHERE id_lsb = ? and id_msb = ?";
private static final String SELECT_ALL_FROM_BRIDGES = "SELECT id_lsb, id_msb, "
+ " create_time,"
+ " link_id_lsb, link_id_msb, "
+ "arguments FROM " + BRIDGES_TABLE_NAME
+ " WHERE link_id_lsb = ? and link_id_msb = ?";
private static final String FIND_BRIDGE = "SELECT id_lsb, id_msb FROM " + BRIDGES_TABLE_NAME +
" WHERE id_lsb = ? and id_msb = ?";
private static final String INSERT_INTO_BRIDGES = "INSERT INTO " + BRIDGES_TABLE_NAME + "( id_lsb, id_msb, "
+ "create_time, "
+ "link_id_lsb, link_id_msb, "
+ "arguments )"
+ " values (?, ?, ?, ?, ?, ?)";
private static final String CREATE_XIDS_TABLE =
"CREATE TABLE "+XID_TABLE_NAME+" ( format bigint not null,"
+ " global_id varchar(64) for bit data, branch_id varchar(64) for bit data, PRIMARY KEY ( format, " +
"global_id, branch_id ))";
private static final String INSERT_INTO_XIDS =
"INSERT INTO "+XID_TABLE_NAME+" ( format, global_id, branch_id ) values (?, ?, ?)";
private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME
+ " WHERE format = ? and global_id = ? and branch_id = ?";
private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME;
private static final String CREATE_XID_ACTIONS_TABLE =
"CREATE TABLE "+XID_ACTIONS_TABLE_NAME+" ( format bigint not null,"
+ " global_id varchar(64) for bit data not null, branch_id varchar(64) for bit data not null, " +
"action_type char not null, queue_name varchar(255) not null, message_id bigint not null" +
", PRIMARY KEY ( " +
"format, global_id, branch_id, action_type, queue_name, message_id))";
private static final String INSERT_INTO_XID_ACTIONS =
"INSERT INTO "+XID_ACTIONS_TABLE_NAME+" ( format, global_id, branch_id, action_type, " +
"queue_name, message_id ) values (?,?,?,?,?,?) ";
private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME
+ " WHERE format = ? and global_id = ? and branch_id = ?";
private static final String SELECT_ALL_FROM_XID_ACTIONS =
"SELECT action_type, queue_name, message_id FROM " + XID_ACTIONS_TABLE_NAME +
" WHERE format = ? and global_id = ? and branch_id = ?";
private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
private LogSubject _logSubject;
private boolean _configured;
private static final class CommitStoreFuture implements StoreFuture
{
public boolean isComplete()
{
return true;
}
public void waitForCompletion()
{
}
}
private enum State
{
INITIAL,
CONFIGURING,
RECOVERING,
STARTED,
CLOSING,
CLOSED
}
private State _state = State.INITIAL;
public void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
stateTransition(State.INITIAL, State.CONFIGURING);
_logSubject = logSubject;
CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
if(!_configured)
{
commonConfiguration(name, storeConfiguration, logSubject);
_configured = true;
}
// this recovers durable exchanges, queues, and bindings
recover(recoveryHandler);
stateTransition(State.RECOVERING, State.STARTED);
}
public void configureMessageStore(String name,
MessageStoreRecoveryHandler recoveryHandler,
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
if(!_configured)
{
_logSubject = logSubject;
}
CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
if(!_configured)
{
commonConfiguration(name, storeConfiguration, logSubject);
_configured = true;
}
recoverMessages(recoveryHandler);
}
public void configureTransactionLog(String name,
TransactionLogRecoveryHandler recoveryHandler,
Configuration storeConfiguration,
LogSubject logSubject) throws Exception
{
if(!_configured)
{
_logSubject = logSubject;
}
CurrentActor.get().message(_logSubject, TransactionLogMessages.CREATED(this.getClass().getName()));
if(!_configured)
{
_logSubject = logSubject;
commonConfiguration(name, storeConfiguration, logSubject);
_configured = true;
}
TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(recoveryHandler);
recoverXids(dtxrh);
}
private void commonConfiguration(String name, Configuration storeConfiguration, LogSubject logSubject)
throws ClassNotFoundException, SQLException
{
initialiseDriver();
//Update to pick up QPID_WORK and use that as the default location not just derbyDB
final String databasePath = storeConfiguration.getString(ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")
+ File.separator + "derbyDB");
File environmentPath = new File(databasePath);
if (!environmentPath.exists())
{
if (!environmentPath.mkdirs())
{
throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+ "Ensure the path is correct and that the permissions are correct.");
}
}
CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath()));
createOrOpenDatabase(name, databasePath);
}
private static synchronized void initialiseDriver() throws ClassNotFoundException
{
if(DRIVER_CLASS == null)
{
DRIVER_CLASS = (Class<Driver>) Class.forName(SQL_DRIVER_NAME);
}
}
private void createOrOpenDatabase(String name, final String environmentPath) throws SQLException
{
//FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created.
_connectionURL = "jdbc:derby:" + environmentPath + "/" + name + ";create=true";
Connection conn = newAutoCommitConnection();
createVersionTable(conn);
createExchangeTable(conn);
createQueueTable(conn);
createBindingsTable(conn);
createQueueEntryTable(conn);
createMetaDataTable(conn);
createMessageContentTable(conn);
createLinkTable(conn);
createBridgeTable(conn);
createXidTable(conn);
createXidActionTable(conn);
conn.close();
}
private void createVersionTable(final Connection conn) throws SQLException
{
if(!tableExists(DB_VERSION_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute(CREATE_DB_VERSION_TABLE);
}
finally
{
stmt.close();
}
PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION);
try
{
pstmt.setInt(1, DB_VERSION);
pstmt.execute();
}
finally
{
pstmt.close();
}
}
}
private void createExchangeTable(final Connection conn) throws SQLException
{
if(!tableExists(EXCHANGE_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute(CREATE_EXCHANGE_TABLE);
}
finally
{
stmt.close();
}
}
}
private void createQueueTable(final Connection conn) throws SQLException
{
if(!tableExists(QUEUE_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute(CREATE_QUEUE_TABLE);
}
finally
{
stmt.close();
}
}
}
private void createBindingsTable(final Connection conn) throws SQLException
{
if(!tableExists(BINDINGS_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute(CREATE_BINDINGS_TABLE);
}
finally
{
stmt.close();
}
}
}
private void createQueueEntryTable(final Connection conn) throws SQLException
{
if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute(CREATE_QUEUE_ENTRY_TABLE);
}
finally
{
stmt.close();
}
}
}
private void createMetaDataTable(final Connection conn) throws SQLException
{
if(!tableExists(META_DATA_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute(CREATE_META_DATA_TABLE);
}
finally
{
stmt.close();
}
}
}
private void createMessageContentTable(final Connection conn) throws SQLException
{
if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute(CREATE_MESSAGE_CONTENT_TABLE);
}
finally
{
stmt.close();
}
}
}
private void createLinkTable(final Connection conn) throws SQLException
{
if(!tableExists(LINKS_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute(CREATE_LINKS_TABLE);
}
finally
{
stmt.close();
}
}
}
private void createBridgeTable(final Connection conn) throws SQLException
{
if(!tableExists(BRIDGES_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute(CREATE_BRIDGES_TABLE);
}
finally
{
stmt.close();
}
}
}
private void createXidTable(final Connection conn) throws SQLException
{
if(!tableExists(XID_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute(CREATE_XIDS_TABLE);
}
finally
{
stmt.close();
}
}
}
private void createXidActionTable(final Connection conn) throws SQLException
{
if(!tableExists(XID_ACTIONS_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute(CREATE_XID_ACTIONS_TABLE);
}
finally
{
stmt.close();
}
}
}
private boolean tableExists(final String tableName, final Connection conn) throws SQLException
{
PreparedStatement stmt = conn.prepareStatement(TABLE_EXISTANCE_QUERY);
try
{
stmt.setString(1, tableName);
ResultSet rs = stmt.executeQuery();
try
{
return rs.next();
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
public void recover(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
{
stateTransition(State.CONFIGURING, State.RECOVERING);
CurrentActor.get().message(_logSubject,MessageStoreMessages.RECOVERY_START());
try
{
ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this);
loadQueues(qrh);
ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
List<String> exchanges = loadExchanges(erh);
ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery();
recoverBindings(brh, exchanges);
ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
recoverBrokerLinks(lrh);
}
catch (SQLException e)
{
throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
}
}
private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
throws SQLException
{
_logger.info("Recovering broker links...");
Connection conn = null;
try
{
conn = newAutoCommitConnection();
PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_LINKS);
try
{
ResultSet rs = stmt.executeQuery();
try
{
while(rs.next())
{
UUID id = new UUID(rs.getLong(2), rs.getLong(1));
long createTime = rs.getLong(3);
Blob argumentsAsBlob = rs.getBlob(4);
byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
int size = dis.readInt();
Map<String,String> arguments = new HashMap<String, String>();
for(int i = 0; i < size; i++)
{
arguments.put(dis.readUTF(), dis.readUTF());
}
ConfigurationRecoveryHandler.BridgeRecoveryHandler brh = lrh.brokerLink(id, createTime, arguments);
recoverBridges(brh, id);
}
}
catch (IOException e)
{
throw new SQLException(e.getMessage(), e);
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
finally
{
if(conn != null)
{
conn.close();
}
}
}
private void recoverBridges(final ConfigurationRecoveryHandler.BridgeRecoveryHandler brh, final UUID linkId)
throws SQLException
{
_logger.info("Recovering bridges for link " + linkId + "...");
Connection conn = null;
try
{
conn = newAutoCommitConnection();
PreparedStatement stmt = conn.prepareStatement(SELECT_ALL_FROM_BRIDGES);
try
{
stmt.setLong(1, linkId.getLeastSignificantBits());
stmt.setLong(2, linkId.getMostSignificantBits());
ResultSet rs = stmt.executeQuery();
try
{
while(rs.next())
{
UUID id = new UUID(rs.getLong(2), rs.getLong(1));
long createTime = rs.getLong(3);
Blob argumentsAsBlob = rs.getBlob(6);
byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(dataAsBytes));
int size = dis.readInt();
Map<String,String> arguments = new HashMap<String, String>();
for(int i = 0; i < size; i++)
{
arguments.put(dis.readUTF(), dis.readUTF());
}
brh.bridge(id, createTime, arguments);
}
brh.completeBridgeRecoveryForLink();
}
catch (IOException e)
{
throw new SQLException(e.getMessage(), e);
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
finally
{
if(conn != null)
{
conn.close();
}
}
}
private void loadQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
{
Statement stmt = conn.createStatement();
try
{
ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
try
{
while(rs.next())
{
String queueName = rs.getString(1);
String owner = rs.getString(2);
boolean exclusive = rs.getBoolean(3);
Blob argumentsAsBlob = rs.getBlob(4);
byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
FieldTable arguments;
if(dataAsBytes.length > 0)
{
try
{
arguments = new FieldTable(new DataInputStream(new ByteArrayInputStream(dataAsBytes)),dataAsBytes.length);
}
catch (IOException e)
{
throw new RuntimeException("IO Exception should not be thrown",e);
}
}
else
{
arguments = null;
}
qrh.queue(queueName, owner, exclusive, arguments);
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
finally
{
conn.close();
}
}
private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws SQLException
{
List<String> exchanges = new ArrayList<String>();
Connection conn = null;
try
{
conn = newAutoCommitConnection();
Statement stmt = conn.createStatement();
try
{
ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
try
{
while(rs.next())
{
String exchangeName = rs.getString(1);
String type = rs.getString(2);
boolean autoDelete = rs.getShort(3) != 0;
exchanges.add(exchangeName);
erh.exchange(exchangeName, type, autoDelete);
}
return exchanges;
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
finally
{
if(conn != null)
{
conn.close();
}
}
}
private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws SQLException
{
_logger.info("Recovering bindings...");
Connection conn = null;
try
{
conn = newAutoCommitConnection();
PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
try
{
ResultSet rs = stmt.executeQuery();
try
{
while(rs.next())
{
String exchangeName = rs.getString(1);
String queueName = rs.getString(2);
String bindingKey = rs.getString(3);
Blob arguments = rs.getBlob(4);
java.nio.ByteBuffer buf;
if(arguments != null && arguments.length() != 0)
{
byte[] argumentBytes = arguments.getBytes(1, (int) arguments.length());
buf = java.nio.ByteBuffer.wrap(argumentBytes);
}
else
{
buf = null;
}
brh.binding(exchangeName, queueName, bindingKey, buf);
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
finally
{
if(conn != null)
{
conn.close();
}
}
}
public void close() throws Exception
{
CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
_closed.getAndSet(true);
try
{
Connection conn = DriverManager.getConnection(_connectionURL + ";shutdown=true");
// Shouldn't reach this point - shutdown=true should throw SQLException
conn.close();
_logger.error("Unable to shut down the store");
}
catch (SQLException e)
{
if (e.getSQLState().equalsIgnoreCase(DERBY_SINGLE_DB_SHUTDOWN_CODE))
{
//expected and represents a clean shutdown of this database only, do nothing.
}
else
{
_logger.error("Exception whilst shutting down the store: " + e);
}
}
}
public StoredMessage addMessage(StorableMessageMetaData metaData)
{
if(metaData.isPersistent())
{
return new StoredDerbyMessage(_messageId.incrementAndGet(), metaData);
}
else
{
return new StoredMemoryMessage(_messageId.incrementAndGet(), metaData);
}
}
public StoredMessage getMessage(long messageNumber)
{
return null;
}
public void removeMessage(long messageId)
{
try
{
Connection conn = newConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_META_DATA);
try
{
stmt.setLong(1,messageId);
int results = stmt.executeUpdate();
stmt.close();
if (results == 0)
{
_logger.warn("Message metadata not found for message id " + messageId);
}
if (_logger.isDebugEnabled())
{
_logger.debug("Deleted metadata for message " + messageId);
}
stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
results = stmt.executeUpdate();
}
finally
{
stmt.close();
}
conn.commit();
}
catch(SQLException e)
{
try
{
conn.rollback();
}
catch(SQLException t)
{
// ignore - we are re-throwing underlying exception
}
throw e;
}
finally
{
conn.close();
}
}
catch (SQLException e)
{
throw new RuntimeException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
}
}
public void createExchange(Exchange exchange) throws AMQStoreException
{
if (_state != State.RECOVERING)
{
try
{
Connection conn = newAutoCommitConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(FIND_EXCHANGE);
try
{
stmt.setString(1, exchange.getNameShortString().toString());
ResultSet rs = stmt.executeQuery();
try
{
// If we don't have any data in the result set then we can add this exchange
if (!rs.next())
{
PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_EXCHANGE);
try
{
insertStmt.setString(1, exchange.getName().toString());
insertStmt.setString(2, exchange.getTypeShortString().asString());
insertStmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0);
insertStmt.execute();
}
finally
{
insertStmt.close();
}
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
finally
{
conn.close();
}
}
catch (SQLException e)
{
throw new AMQStoreException("Error writing Exchange with name " + exchange.getNameShortString() + " to database: " + e.getMessage(), e);
}
}
}
public void removeExchange(Exchange exchange) throws AMQStoreException
{
try
{
Connection conn = newAutoCommitConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE);
try
{
stmt.setString(1, exchange.getNameShortString().toString());
int results = stmt.executeUpdate();
stmt.close();
if(results == 0)
{
throw new AMQStoreException("Exchange " + exchange.getNameShortString() + " not found");
}
}
finally
{
stmt.close();
}
}
finally
{
conn.close();
}
}
catch (SQLException e)
{
throw new AMQStoreException("Error deleting Exchange with name " + exchange.getNameShortString() + " from database: " + e.getMessage(), e);
}
}
public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
throws AMQStoreException
{
if (_state != State.RECOVERING)
{
try
{
Connection conn = newAutoCommitConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(FIND_BINDING);
try
{
stmt.setString(1, exchange.getNameShortString().toString() );
stmt.setString(2, queue.getNameShortString().toString());
stmt.setString(3, routingKey == null ? null : routingKey.toString());
ResultSet rs = stmt.executeQuery();
try
{
// If this binding is not already in the store then create it.
if (!rs.next())
{
PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
try
{
insertStmt.setString(1, exchange.getNameShortString().toString() );
insertStmt.setString(2, queue.getNameShortString().toString());
insertStmt.setString(3, routingKey == null ? null : routingKey.toString());
if(args != null)
{
// TODO - In Java 6 we could use create/set Blob
byte[] bytes = args.getDataAsBytes();
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
insertStmt.setBinaryStream(4, bis, bytes.length);
}
else
{
insertStmt.setNull(4, Types.BLOB);
}
insertStmt.executeUpdate();
}
finally
{
insertStmt.close();
}
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
finally
{
conn.close();
}
}
catch (SQLException e)
{
throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
+ exchange.getNameShortString() + " to database: " + e.getMessage(), e);
}
}
}
public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
throws AMQStoreException
{
Connection conn = null;
PreparedStatement stmt = null;
try
{
conn = newAutoCommitConnection();
// exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
stmt.setString(1, exchange.getNameShortString().toString() );
stmt.setString(2, queue.getNameShortString().toString());
stmt.setString(3, routingKey == null ? null : routingKey.toString());
int result = stmt.executeUpdate();
if(result != 1)
{
throw new AMQStoreException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange "
+ exchange.getNameShortString() + " not found");
}
}
catch (SQLException e)
{
throw new AMQStoreException("Error removing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
+ exchange.getNameShortString() + " in database: " + e.getMessage(), e);
}
finally
{
closePreparedStatement(stmt);
closeConnection(conn);
}
}
public void createQueue(AMQQueue queue) throws AMQStoreException
{
createQueue(queue, null);
}
public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
{
_logger.debug("public void createQueue(AMQQueue queue = " + queue + "): called");
if (_state != State.RECOVERING)
{
try
{
Connection conn = newAutoCommitConnection();
PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
try
{
stmt.setString(1, queue.getNameShortString().toString());
ResultSet rs = stmt.executeQuery();
try
{
// If we don't have any data in the result set then we can add this queue
if (!rs.next())
{
PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_QUEUE);
try
{
String owner = queue.getOwner() == null ? null : queue.getOwner().toString();
insertStmt.setString(1, queue.getNameShortString().toString());
insertStmt.setString(2, owner);
insertStmt.setBoolean(3,queue.isExclusive());
final byte[] underlying;
if(arguments != null)
{
underlying = arguments.getDataAsBytes();
}
else
{
underlying = new byte[0];
}
ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
insertStmt.setBinaryStream(4,bis,underlying.length);
insertStmt.execute();
}
finally
{
insertStmt.close();
}
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
conn.close();
}
catch (SQLException e)
{
throw new AMQStoreException("Error writing AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e);
}
}
}
/**
* Updates the specified queue in the persistent store, IF it is already present. If the queue
* is not present in the store, it will not be added.
*
* NOTE: Currently only updates the exclusivity.
*
* @param queue The queue to update the entry for.
* @throws AMQStoreException If the operation fails for any reason.
*/
public void updateQueue(final AMQQueue queue) throws AMQStoreException
{
if (_state != State.RECOVERING)
{
try
{
Connection conn = newAutoCommitConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
try
{
stmt.setString(1, queue.getNameShortString().toString());
ResultSet rs = stmt.executeQuery();
try
{
if (rs.next())
{
PreparedStatement stmt2 = conn.prepareStatement(UPDATE_QUEUE_EXCLUSIVITY);
try
{
stmt2.setBoolean(1,queue.isExclusive());
stmt2.setString(2, queue.getNameShortString().toString());
stmt2.execute();
}
finally
{
stmt2.close();
}
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
finally
{
conn.close();
}
}
catch (SQLException e)
{
throw new AMQStoreException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e);
}
}
}
/**
* Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
* isolation and with auto-commit transactions enabled.
*/
private Connection newAutoCommitConnection() throws SQLException
{
final Connection connection = newConnection();
try
{
connection.setAutoCommit(true);
}
catch (SQLException sqlEx)
{
try
{
connection.close();
}
finally
{
throw sqlEx;
}
}
return connection;
}
/**
* Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
* isolation and with auto-commit transactions disabled.
*/
private Connection newConnection() throws SQLException
{
final Connection connection = DriverManager.getConnection(_connectionURL);
try
{
connection.setAutoCommit(false);
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
}
catch (SQLException sqlEx)
{
try
{
connection.close();
}
finally
{
throw sqlEx;
}
}
return connection;
}
public void removeQueue(final AMQQueue queue) throws AMQStoreException
{
AMQShortString name = queue.getNameShortString();
_logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
Connection conn = null;
PreparedStatement stmt = null;
try
{
conn = newAutoCommitConnection();
stmt = conn.prepareStatement(DELETE_FROM_QUEUE);
stmt.setString(1, name.toString());
int results = stmt.executeUpdate();
if (results == 0)
{
throw new AMQStoreException("Queue " + name + " not found");
}
}
catch (SQLException e)
{
throw new AMQStoreException("Error deleting AMQQueue with name " + name + " from database: " + e.getMessage(), e);
}
finally
{
closePreparedStatement(stmt);
closeConnection(conn);
}
}
public void createBrokerLink(final BrokerLink link) throws AMQStoreException
{
_logger.debug("public void createBrokerLink(BrokerLink = " + link + "): called");
if (_state != State.RECOVERING)
{
try
{
Connection conn = newAutoCommitConnection();
PreparedStatement stmt = conn.prepareStatement(FIND_LINK);
try
{
stmt.setLong(1, link.getId().getLeastSignificantBits());
stmt.setLong(2, link.getId().getMostSignificantBits());
ResultSet rs = stmt.executeQuery();
try
{
// If we don't have any data in the result set then we can add this queue
if (!rs.next())
{
PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_LINKS);
try
{
insertStmt.setLong(1, link.getId().getLeastSignificantBits());
insertStmt.setLong(2, link.getId().getMostSignificantBits());
insertStmt.setLong(3, link.getCreateTime());
byte[] argumentBytes = convertStringMapToBytes(link.getArguments());
ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
insertStmt.setBinaryStream(4,bis,argumentBytes.length);
insertStmt.execute();
}
finally
{
insertStmt.close();
}
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
conn.close();
}
catch (SQLException e)
{
throw new AMQStoreException("Error writing " + link + " to database: " + e.getMessage(), e);
}
}
}
private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws AMQStoreException
{
byte[] argumentBytes;
if(arguments == null)
{
argumentBytes = new byte[0];
}
else
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
try
{
dos.writeInt(arguments.size());
for(Map.Entry<String,String> arg : arguments.entrySet())
{
dos.writeUTF(arg.getKey());
dos.writeUTF(arg.getValue());
}
}
catch (IOException e)
{
// This should never happen
throw new AMQStoreException(e.getMessage(), e);
}
argumentBytes = bos.toByteArray();
}
return argumentBytes;
}
public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
{
_logger.debug("public void deleteBrokerLink( " + link + "): called");
Connection conn = null;
PreparedStatement stmt = null;
try
{
conn = newAutoCommitConnection();
stmt = conn.prepareStatement(DELETE_FROM_LINKS);
stmt.setLong(1, link.getId().getLeastSignificantBits());
stmt.setLong(2, link.getId().getMostSignificantBits());
int results = stmt.executeUpdate();
if (results == 0)
{
throw new AMQStoreException("Link " + link + " not found");
}
}
catch (SQLException e)
{
throw new AMQStoreException("Error deleting Link " + link + " from database: " + e.getMessage(), e);
}
finally
{
closePreparedStatement(stmt);
closeConnection(conn);
}
}
public void createBridge(final Bridge bridge) throws AMQStoreException
{
_logger.debug("public void createBridge(BrokerLink = " + bridge + "): called");
if (_state != State.RECOVERING)
{
try
{
Connection conn = newAutoCommitConnection();
PreparedStatement stmt = conn.prepareStatement(FIND_BRIDGE);
try
{
UUID id = bridge.getId();
stmt.setLong(1, id.getLeastSignificantBits());
stmt.setLong(2, id.getMostSignificantBits());
ResultSet rs = stmt.executeQuery();
try
{
// If we don't have any data in the result set then we can add this queue
if (!rs.next())
{
PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BRIDGES);
try
{
insertStmt.setLong(1, id.getLeastSignificantBits());
insertStmt.setLong(2, id.getMostSignificantBits());
insertStmt.setLong(3, bridge.getCreateTime());
UUID linkId = bridge.getLink().getId();
insertStmt.setLong(4, linkId.getLeastSignificantBits());
insertStmt.setLong(5, linkId.getMostSignificantBits());
byte[] argumentBytes = convertStringMapToBytes(bridge.getArguments());
ByteArrayInputStream bis = new ByteArrayInputStream(argumentBytes);
insertStmt.setBinaryStream(6,bis,argumentBytes.length);
insertStmt.execute();
}
finally
{
insertStmt.close();
}
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
conn.close();
}
catch (SQLException e)
{
throw new AMQStoreException("Error writing " + bridge + " to database: " + e.getMessage(), e);
}
}
}
public void deleteBridge(final Bridge bridge) throws AMQStoreException
{
_logger.debug("public void deleteBridge( " + bridge + "): called");
Connection conn = null;
PreparedStatement stmt = null;
try
{
conn = newAutoCommitConnection();
stmt = conn.prepareStatement(DELETE_FROM_BRIDGES);
stmt.setLong(1, bridge.getId().getLeastSignificantBits());
stmt.setLong(2, bridge.getId().getMostSignificantBits());
int results = stmt.executeUpdate();
if (results == 0)
{
throw new AMQStoreException("Bridge " + bridge + " not found");
}
}
catch (SQLException e)
{
throw new AMQStoreException("Error deleting bridge " + bridge + " from database: " + e.getMessage(), e);
}
finally
{
closePreparedStatement(stmt);
closeConnection(conn);
}
}
public Transaction newTransaction()
{
return new DerbyTransaction();
}
public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
{
String name = queue.getResourceName();
Connection conn = connWrapper.getConnection();
try
{
if (_logger.isDebugEnabled())
{
_logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
}
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
try
{
stmt.setString(1,name);
stmt.setLong(2,messageId);
stmt.executeUpdate();
}
finally
{
stmt.close();
}
}
catch (SQLException e)
{
_logger.error("Failed to enqueue: " + e.getMessage(), e);
throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name
+ " to database", e);
}
}
public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
{
String name = queue.getResourceName();
Connection conn = connWrapper.getConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
try
{
stmt.setString(1,name);
stmt.setLong(2,messageId);
int results = stmt.executeUpdate();
if(results != 1)
{
throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name);
}
if (_logger.isDebugEnabled())
{
_logger.debug("Dequeuing message " + messageId + " on queue " + name );
}
}
finally
{
stmt.close();
}
}
catch (SQLException e)
{
_logger.error("Failed to dequeue: " + e.getMessage(), e);
throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + name
+ " from database", e);
}
}
private void removeXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId)
throws AMQStoreException
{
Connection conn = connWrapper.getConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_XIDS);
try
{
stmt.setLong(1,format);
stmt.setBytes(2,globalId);
stmt.setBytes(3,branchId);
int results = stmt.executeUpdate();
if(results != 1)
{
throw new AMQStoreException("Unable to find message with xid");
}
}
finally
{
stmt.close();
}
stmt = conn.prepareStatement(DELETE_FROM_XID_ACTIONS);
try
{
stmt.setLong(1,format);
stmt.setBytes(2,globalId);
stmt.setBytes(3,branchId);
int results = stmt.executeUpdate();
}
finally
{
stmt.close();
}
}
catch (SQLException e)
{
_logger.error("Failed to dequeue: " + e.getMessage(), e);
throw new AMQStoreException("Error deleting enqueued message with xid", e);
}
}
private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId,
Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws AMQStoreException
{
Connection conn = connWrapper.getConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_XIDS);
try
{
stmt.setLong(1,format);
stmt.setBytes(2, globalId);
stmt.setBytes(3, branchId);
stmt.executeUpdate();
}
finally
{
stmt.close();
}
stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS);
try
{
stmt.setLong(1,format);
stmt.setBytes(2, globalId);
stmt.setBytes(3, branchId);
if(enqueues != null)
{
stmt.setString(4, "E");
for(Transaction.Record record : enqueues)
{
stmt.setString(5, record.getQueue().getResourceName());
stmt.setLong(6, record.getMessage().getMessageNumber());
stmt.executeUpdate();
}
}
if(dequeues != null)
{
stmt.setString(4, "D");
for(Transaction.Record record : dequeues)
{
stmt.setString(5, record.getQueue().getResourceName());
stmt.setLong(6, record.getMessage().getMessageNumber());
stmt.executeUpdate();
}
}
}
finally
{
stmt.close();
}
}
catch (SQLException e)
{
_logger.error("Failed to enqueue: " + e.getMessage(), e);
throw new AMQStoreException("Error writing xid ", e);
}
}
private static final class ConnectionWrapper
{
private final Connection _connection;
public ConnectionWrapper(Connection conn)
{
_connection = conn;
}
public Connection getConnection()
{
return _connection;
}
}
public void commitTran(ConnectionWrapper connWrapper) throws AMQStoreException
{
try
{
Connection conn = connWrapper.getConnection();
conn.commit();
if (_logger.isDebugEnabled())
{
_logger.debug("commit tran completed");
}
conn.close();
}
catch (SQLException e)
{
throw new AMQStoreException("Error commit tx: " + e.getMessage(), e);
}
finally
{
}
}
public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws AMQStoreException
{
commitTran(connWrapper);
return new CommitStoreFuture();
}
public void abortTran(ConnectionWrapper connWrapper) throws AMQStoreException
{
if (connWrapper == null)
{
throw new AMQStoreException("Fatal internal error: transactional context is empty at abortTran");
}
if (_logger.isDebugEnabled())
{
_logger.debug("abort tran called: " + connWrapper.getConnection());
}
try
{
Connection conn = connWrapper.getConnection();
conn.rollback();
conn.close();
}
catch (SQLException e)
{
throw new AMQStoreException("Error aborting transaction: " + e.getMessage(), e);
}
}
public Long getNewMessageId()
{
return _messageId.incrementAndGet();
}
private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
throws SQLException
{
if(_logger.isDebugEnabled())
{
_logger.debug("Adding metadata for message " +messageId);
}
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA);
try
{
stmt.setLong(1,messageId);
final int bodySize = 1 + metaData.getStorableSize();
byte[] underlying = new byte[bodySize];
underlying[0] = (byte) metaData.getType().ordinal();
java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
buf.position(1);
buf = buf.slice();
metaData.writeToBuffer(0, buf);
ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
try
{
stmt.setBinaryStream(2,bis,underlying.length);
int result = stmt.executeUpdate();
if(result == 0)
{
throw new RuntimeException("Unable to add meta data for message " +messageId);
}
}
finally
{
try
{
bis.close();
}
catch (IOException e)
{
throw new SQLException(e);
}
}
}
finally
{
stmt.close();
}
}
private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
{
MessageStoreRecoveryHandler.StoredMessageRecoveryHandler messageHandler = recoveryHandler.begin();
Statement stmt = conn.createStatement();
try
{
ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
try
{
long maxId = 0;
while(rs.next())
{
long messageId = rs.getLong(1);
Blob dataAsBlob = rs.getBlob(2);
if(messageId > maxId)
{
maxId = messageId;
}
byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
buf.position(1);
buf = buf.slice();
MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false);
messageHandler.message(message);
}
_messageId.set(maxId);
messageHandler.completeMessageRecovery();
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
finally
{
conn.close();
}
}
private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
{
TransactionLogRecoveryHandler.QueueEntryRecoveryHandler queueEntryHandler = recoveryHandler.begin(this);
Statement stmt = conn.createStatement();
try
{
ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
try
{
while(rs.next())
{
String queueName = rs.getString(1);
long messageId = rs.getLong(2);
queueEntryHandler.queueEntry(queueName,messageId);
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
return queueEntryHandler.completeQueueEntryRecovery();
}
finally
{
conn.close();
}
}
private static final class Xid
{
private final long _format;
private final byte[] _globalId;
private final byte[] _branchId;
public Xid(long format, byte[] globalId, byte[] branchId)
{
_format = format;
_globalId = globalId;
_branchId = branchId;
}
public long getFormat()
{
return _format;
}
public byte[] getGlobalId()
{
return _globalId;
}
public byte[] getBranchId()
{
return _branchId;
}
}
private static class RecordImpl implements MessageStore.Transaction.Record, TransactionLogResource, EnqueableMessage
{
private final String _queueName;
private long _messageNumber;
public RecordImpl(String queueName, long messageNumber)
{
_queueName = queueName;
_messageNumber = messageNumber;
}
public TransactionLogResource getQueue()
{
return this;
}
public EnqueableMessage getMessage()
{
return this;
}
public long getMessageNumber()
{
return _messageNumber;
}
public boolean isPersistent()
{
return true;
}
public StoredMessage getStoredMessage()
{
throw new UnsupportedOperationException();
}
public String getResourceName()
{
return _queueName;
}
}
private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
{
List<Xid> xids = new ArrayList<Xid>();
Statement stmt = conn.createStatement();
try
{
ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
try
{
while(rs.next())
{
long format = rs.getLong(1);
byte[] globalId = rs.getBytes(2);
byte[] branchId = rs.getBytes(3);
xids.add(new Xid(format, globalId, branchId));
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
for(Xid xid : xids)
{
List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
try
{
pstmt.setLong(1, xid.getFormat());
pstmt.setBytes(2, xid.getGlobalId());
pstmt.setBytes(3, xid.getBranchId());
ResultSet rs = pstmt.executeQuery();
try
{
while(rs.next())
{
String actionType = rs.getString(1);
String queueName = rs.getString(2);
long messageId = rs.getLong(3);
RecordImpl record = new RecordImpl(queueName, messageId);
List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
records.add(record);
}
}
finally
{
rs.close();
}
}
finally
{
pstmt.close();
}
dtxrh.dtxRecord(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
enqueues.toArray(new RecordImpl[enqueues.size()]),
dequeues.toArray(new RecordImpl[dequeues.size()]));
}
dtxrh.completeDtxRecordRecovery();
}
finally
{
conn.close();
}
}
StorableMessageMetaData getMetaData(long messageId) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_META_DATA);
try
{
stmt.setLong(1,messageId);
ResultSet rs = stmt.executeQuery();
try
{
if(rs.next())
{
Blob dataAsBlob = rs.getBlob(1);
byte[] dataAsBytes = dataAsBlob.getBytes(1,(int) dataAsBlob.length());
java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
buf.position(1);
buf = buf.slice();
MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
return metaData;
}
else
{
throw new RuntimeException("Meta data not found for message with id " + messageId);
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
finally
{
conn.close();
}
}
private void addContent(Connection conn, long messageId, int offset, ByteBuffer src)
{
if(_logger.isDebugEnabled())
{
_logger.debug("Adding content chunk offset " + offset + " for message " +messageId);
}
PreparedStatement stmt = null;
try
{
src = src.slice();
byte[] chunkData = new byte[src.limit()];
src.duplicate().get(chunkData);
stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
stmt.setInt(2, offset);
stmt.setInt(3, offset+chunkData.length);
// TODO in Java 6 we could just use blobs
ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
stmt.setBinaryStream(4, bis, chunkData.length);
stmt.executeUpdate();
}
catch (SQLException e)
{
closeConnection(conn);
throw new RuntimeException("Error adding content chunk offset " + offset + " for message " + messageId + ": " + e.getMessage(), e);
}
finally
{
closePreparedStatement(stmt);
}
}
public int getContent(long messageId, int offset, ByteBuffer dst)
{
Connection conn = null;
PreparedStatement stmt = null;
try
{
conn = newAutoCommitConnection();
stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
stmt.setInt(2, offset);
stmt.setInt(3, offset+dst.remaining());
ResultSet rs = stmt.executeQuery();
int written = 0;
while(rs.next())
{
int offsetInMessage = rs.getInt(1);
Blob dataAsBlob = rs.getBlob(2);
final int size = (int) dataAsBlob.length();
byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
int posInArray = offset + written - offsetInMessage;
int count = size - posInArray;
if(count > dst.remaining())
{
count = dst.remaining();
}
dst.put(dataAsBytes,posInArray,count);
written+=count;
if(dst.remaining() == 0)
{
break;
}
}
return written;
}
catch (SQLException e)
{
throw new RuntimeException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e.getMessage(), e);
}
finally
{
closePreparedStatement(stmt);
closeConnection(conn);
}
}
public boolean isPersistent()
{
return true;
}
private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException
{
if (_state != requiredState)
{
throw new AMQStoreException("Cannot transition to the state: " + newState + "; need to be in state: " + requiredState
+ "; currently in state: " + _state);
}
_state = newState;
}
private class DerbyTransaction implements Transaction
{
private final ConnectionWrapper _connWrapper;
private DerbyTransaction()
{
try
{
_connWrapper = new ConnectionWrapper(newConnection());
}
catch (SQLException e)
{
throw new RuntimeException(e);
}
}
public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
if(message.getStoredMessage() instanceof StoredDerbyMessage)
{
try
{
((StoredDerbyMessage)message.getStoredMessage()).store(_connWrapper.getConnection());
}
catch (SQLException e)
{
throw new AMQStoreException("Exception on enqueuing message " + _messageId, e);
}
}
DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
}
public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
DerbyMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
}
public void commitTran() throws AMQStoreException
{
DerbyMessageStore.this.commitTran(_connWrapper);
}
public StoreFuture commitTranAsync() throws AMQStoreException
{
return DerbyMessageStore.this.commitTranAsync(_connWrapper);
}
public void abortTran() throws AMQStoreException
{
DerbyMessageStore.this.abortTran(_connWrapper);
}
public void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException
{
DerbyMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
}
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
throws AMQStoreException
{
DerbyMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
}
}
private class StoredDerbyMessage implements StoredMessage
{
private final long _messageId;
private StorableMessageMetaData _metaData;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
private byte[] _data;
private volatile SoftReference<byte[]> _dataRef;
StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
{
this(messageId, metaData, true);
}
StoredDerbyMessage(long messageId,
StorableMessageMetaData metaData, boolean persist)
{
_messageId = messageId;
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
if(persist)
{
_metaData = metaData;
}
}
public StorableMessageMetaData getMetaData()
{
StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
if(metaData == null)
{
try
{
metaData = DerbyMessageStore.this.getMetaData(_messageId);
}
catch (SQLException e)
{
throw new RuntimeException(e);
}
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
return metaData;
}
public long getMessageNumber()
{
return _messageId;
}
public void addContent(int offsetInMessage, java.nio.ByteBuffer src)
{
src = src.slice();
if(_data == null)
{
_data = new byte[src.remaining()];
_dataRef = new SoftReference<byte[]>(_data);
src.duplicate().get(_data);
}
else
{
byte[] oldData = _data;
_data = new byte[oldData.length + src.remaining()];
_dataRef = new SoftReference<byte[]>(_data);
System.arraycopy(oldData,0,_data,0,oldData.length);
src.duplicate().get(_data, oldData.length, src.remaining());
}
}
public int getContent(int offsetInMessage, java.nio.ByteBuffer dst)
{
byte[] data = _dataRef == null ? null : _dataRef.get();
if(data != null)
{
int length = Math.min(dst.remaining(), data.length - offsetInMessage);
dst.put(data, offsetInMessage, length);
return length;
}
else
{
return DerbyMessageStore.this.getContent(_messageId, offsetInMessage, dst);
}
}
public ByteBuffer getContent(int offsetInMessage, int size)
{
ByteBuffer buf = ByteBuffer.allocate(size);
getContent(offsetInMessage, buf);
buf.position(0);
return buf;
}
public synchronized StoreFuture flushToStore()
{
try
{
if(_metaData != null)
{
Connection conn = newConnection();
store(conn);
conn.commit();
conn.close();
}
}
catch (SQLException e)
{
if(_logger.isDebugEnabled())
{
_logger.debug("Error when trying to flush message " + _messageId + " to store: " + e);
}
throw new RuntimeException(e);
}
return IMMEDIATE_FUTURE;
}
private synchronized void store(final Connection conn) throws SQLException
{
if(_metaData != null)
{
try
{
storeMetaData(conn, _messageId, _metaData);
DerbyMessageStore.this.addContent(conn, _messageId, 0,
_data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
}
finally
{
_metaData = null;
_data = null;
}
}
if(_logger.isDebugEnabled())
{
_logger.debug("Storing message " + _messageId + " to store");
}
}
public void remove()
{
DerbyMessageStore.this.removeMessage(_messageId);
}
}
private void closeConnection(final Connection conn)
{
if(conn != null)
{
try
{
conn.close();
}
catch (SQLException e)
{
_logger.error("Problem closing connection", e);
}
}
}
private void closePreparedStatement(final PreparedStatement stmt)
{
if (stmt != null)
{
try
{
stmt.close();
}
catch(SQLException e)
{
_logger.error("Problem closing prepared statement", e);
}
}
}
}