blob: 6be5460d5f3ca15328b4a25a0fd6db653d69c50e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
import org.apache.qpid.server.store.handler.MessageHandler;
import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonProcessingException;
import org.codehaus.jackson.Version;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.JsonSerializer;
import org.codehaus.jackson.map.Module;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializerProvider;
import org.codehaus.jackson.map.module.SimpleModule;
abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
{
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION";
private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES";
private static final String META_DATA_TABLE_NAME = "QPID_MESSAGE_METADATA";
private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
private static final String XID_TABLE_NAME = "QPID_XIDS";
private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
private static final String CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME = "QPID_CONFIGURED_OBJECT_HIERARCHY";
private static final int DEFAULT_CONFIG_VERSION = 0;
public static final Set<String> CONFIGURATION_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME));
public static final Set<String> MESSAGE_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(DB_VERSION_TABLE_NAME,
META_DATA_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME,
QUEUE_ENTRY_TABLE_NAME,
XID_TABLE_NAME, XID_ACTIONS_TABLE_NAME));
private static final int DB_VERSION = 8;
private final AtomicLong _messageId = new AtomicLong(0);
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
private static final String SELECT_FROM_DB_VERSION = "SELECT version FROM " + DB_VERSION_TABLE_NAME;
private static final String UPDATE_DB_VERSION = "UPDATE " + DB_VERSION_TABLE_NAME + " SET version = ?";
private static final String CREATE_CONFIG_VERSION_TABLE = "CREATE TABLE "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version int not null )";
private static final String INSERT_INTO_CONFIG_VERSION = "INSERT INTO "+ CONFIGURATION_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
private static final String SELECT_FROM_CONFIG_VERSION = "SELECT version FROM " + CONFIGURATION_VERSION_TABLE_NAME;
private static final String UPDATE_CONFIG_VERSION = "UPDATE " + CONFIGURATION_VERSION_TABLE_NAME + " SET version = ?";
private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id";
private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME
+ "( message_id, content ) values (?, ?)";
private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME
+ " WHERE message_id = ?";
private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME
+ " WHERE message_id = ?";
private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";
private static final String SELECT_FROM_META_DATA =
"SELECT meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
private static final String INSERT_INTO_XIDS =
"INSERT INTO "+ XID_TABLE_NAME +" ( format, global_id, branch_id ) values (?, ?, ?)";
private static final String DELETE_FROM_XIDS = "DELETE FROM " + XID_TABLE_NAME
+ " WHERE format = ? and global_id = ? and branch_id = ?";
private static final String SELECT_ALL_FROM_XIDS = "SELECT format, global_id, branch_id FROM " + XID_TABLE_NAME;
private static final String INSERT_INTO_XID_ACTIONS =
"INSERT INTO "+ XID_ACTIONS_TABLE_NAME +" ( format, global_id, branch_id, action_type, " +
"queue_id, message_id ) values (?,?,?,?,?,?) ";
private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME
+ " WHERE format = ? and global_id = ? and branch_id = ?";
private static final String SELECT_ALL_FROM_XID_ACTIONS =
"SELECT action_type, queue_id, message_id FROM " + XID_ACTIONS_TABLE_NAME +
" WHERE format = ? and global_id = ? and branch_id = ?";
private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME
+ " ( id, object_type, attributes) VALUES (?,?,?)";
private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME
+ " set object_type =?, attributes = ? where id = ?";
private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+ " where id = ?";
private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+ " where id = ?";
private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME;
private static final String INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY = "INSERT INTO " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
+ " ( child_id, parent_type, parent_id) VALUES (?,?,?)";
private static final String DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY = "DELETE FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
+ " where child_id = ?";
private static final String SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY = "SELECT child_id, parent_type, parent_id FROM " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME;
protected static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
private static final Module _module;
static
{
SimpleModule module= new SimpleModule("ConfiguredObjectSerializer", new Version(1,0,0,null));
final JsonSerializer<ConfiguredObject> serializer = new JsonSerializer<ConfiguredObject>()
{
@Override
public void serialize(final ConfiguredObject value,
final JsonGenerator jgen,
final SerializerProvider provider)
throws IOException, JsonProcessingException
{
jgen.writeString(value.getId().toString());
}
};
module.addSerializer(ConfiguredObject.class, serializer);
_module = module;
}
protected final EventManager _eventManager = new EventManager();
private final AtomicBoolean _messageStoreOpen = new AtomicBoolean();
private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
private boolean _initialized;
@Override
public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
{
if (_configurationStoreOpen.compareAndSet(false, true))
{
initialiseIfNecessary(parent.getName(), storeSettings);
try
{
createOrOpenConfigurationStoreDatabase();
upgradeIfVersionTableExists(parent);
}
catch(SQLException e)
{
throw new StoreException("Cannot create databases or upgrade", e);
}
}
}
private void initialiseIfNecessary(String virtualHostName, Map<String, Object> storeSettings)
{
if (!_initialized)
{
try
{
implementationSpecificConfiguration(virtualHostName, storeSettings);
}
catch (ClassNotFoundException e)
{
throw new StoreException("Cannot find driver class", e);
}
catch (SQLException e)
{
throw new StoreException("Unexpected exception occured", e);
}
_initialized = true;
}
}
@Override
public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler)
{
checkConfigurationStoreOpen();
try
{
int configVersion = getConfigVersion();
handler.begin(configVersion);
doVisitAllConfiguredObjectRecords(handler);
int newConfigVersion = handler.end();
if(newConfigVersion != configVersion)
{
setConfigVersion(newConfigVersion);
}
}
catch (SQLException e)
{
throw new StoreException("Cannot visit configured object records", e);
}
}
private void doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException
{
Connection conn = newAutoCommitConnection();
Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
final ObjectMapper objectMapper = new ObjectMapper();
try
{
PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
try
{
ResultSet rs = stmt.executeQuery();
try
{
while (rs.next())
{
String id = rs.getString(1);
String objectType = rs.getString(2);
String attributes = getBlobAsString(rs, 3);
final ConfiguredObjectRecordImpl configuredObjectRecord =
new ConfiguredObjectRecordImpl(UUID.fromString(id), objectType,
objectMapper.readValue(attributes, Map.class));
configuredObjects.put(configuredObjectRecord.getId(),configuredObjectRecord);
}
}
catch (JsonMappingException e)
{
throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
}
catch (JsonParseException e)
{
throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
}
catch (IOException e)
{
throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECT_HIERARCHY);
try
{
ResultSet rs = stmt.executeQuery();
try
{
while (rs.next())
{
UUID childId = UUID.fromString(rs.getString(1));
String parentType = rs.getString(2);
UUID parentId = UUID.fromString(rs.getString(3));
ConfiguredObjectRecordImpl child = configuredObjects.get(childId);
ConfiguredObjectRecordImpl parent = configuredObjects.get(parentId);
if(child != null && parent != null)
{
child.addParent(parentType, parent);
}
else if(child != null && child.getType().endsWith("Binding") && parentType.equals("Exchange"))
{
// TODO - remove this hack for amq. exchanges
child.addParent(parentType, new ConfiguredObjectRecordImpl(parentId, parentType, Collections.<String,Object>emptyMap()));
}
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
finally
{
conn.close();
}
for(ConfiguredObjectRecord record : configuredObjects.values())
{
boolean shoudlContinue = handler.handle(record);
if (!shoudlContinue)
{
break;
}
}
}
private void checkConfigurationStoreOpen()
{
if (!_configurationStoreOpen.get())
{
throw new IllegalStateException("Configuration store is not open");
}
}
private void checkMessageStoreOpen()
{
if (!_messageStoreOpen.get())
{
throw new IllegalStateException("Message store is not open");
}
}
private void upgradeIfVersionTableExists(ConfiguredObject<?> parent)
throws SQLException {
Connection conn = newAutoCommitConnection();
try
{
if (tableExists(DB_VERSION_TABLE_NAME, conn))
{
upgradeIfNecessary(parent);
}
}
finally
{
if (conn != null)
{
conn.close();
}
}
}
@Override
public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
{
if (_messageStoreOpen.compareAndSet(false, true))
{
initialiseIfNecessary(parent.getName(), messageStoreSettings);
try
{
createOrOpenMessageStoreDatabase();
upgradeIfNecessary(parent);
visitMessages(new MessageHandler()
{
@Override
public boolean handle(StoredMessage<?> storedMessage)
{
long id = storedMessage.getMessageNumber();
if (_messageId.get() < id)
{
_messageId.set(id);
}
return true;
}
});
}
catch (SQLException e)
{
throw new StoreException("Unable to activate message store ", e);
}
}
}
protected void upgradeIfNecessary(ConfiguredObject<?> parent) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
{
PreparedStatement statement = conn.prepareStatement(SELECT_FROM_DB_VERSION);
try
{
ResultSet rs = statement.executeQuery();
try
{
if(!rs.next())
{
throw new StoreException(DB_VERSION_TABLE_NAME + " does not contain the database version");
}
int version = rs.getInt(1);
switch (version)
{
case 6:
upgradeFromV6();
case 7:
upgradeFromV7(parent);
case DB_VERSION:
return;
default:
throw new StoreException("Unknown database version: " + version);
}
}
finally
{
rs.close();
}
}
finally
{
statement.close();
}
}
finally
{
conn.close();
}
}
private void upgradeFromV6() throws SQLException
{
updateDbVersion(7);
}
private void upgradeFromV7(ConfiguredObject<?> parent) throws SQLException
{
Connection connection = newConnection();
try
{
Map<UUID,Map<String,Object>> bindingsToUpdate = new HashMap<UUID, Map<String, Object>>();
List<UUID> others = new ArrayList<UUID>();
final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(_module);
PreparedStatement stmt = connection.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
try
{
ResultSet rs = stmt.executeQuery();
try
{
while (rs.next())
{
UUID id = UUID.fromString(rs.getString(1));
String objectType = rs.getString(2);
Map<String,Object> attributes = objectMapper.readValue(getBlobAsString(rs, 3),Map.class);
if(objectType.endsWith("Binding"))
{
bindingsToUpdate.put(id,attributes);
}
else
{
others.add(id);
}
}
}
catch (JsonMappingException e)
{
throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
}
catch (JsonParseException e)
{
throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
}
catch (IOException e)
{
throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
stmt = connection.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY);
try
{
for (UUID id : others)
{
stmt.setString(1, id.toString());
stmt.setString(2, "VirtualHost");
stmt.setString(3, parent.getId().toString());
stmt.execute();
}
for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
{
stmt.setString(1, bindingEntry.getKey().toString());
stmt.setString(2,"Queue");
stmt.setString(3, bindingEntry.getValue().remove("queue").toString());
stmt.execute();
stmt.setString(1, bindingEntry.getKey().toString());
stmt.setString(2,"Exchange");
stmt.setString(3, bindingEntry.getValue().remove("exchange").toString());
stmt.execute();
}
}
finally
{
stmt.close();
}
stmt = connection.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
try
{
for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
{
stmt.setString(1, "Binding");
byte[] attributesAsBytes = objectMapper.writeValueAsBytes(bindingEntry.getValue());
ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
stmt.setBinaryStream(2, bis, attributesAsBytes.length);
stmt.setString(3, bindingEntry.getKey().toString());
stmt.execute();
}
}
catch (JsonMappingException e)
{
throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
}
catch (JsonGenerationException e)
{
throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
}
catch (IOException e)
{
throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
}
finally
{
stmt.close();
}
stmt = connection.prepareStatement(UPDATE_DB_VERSION);
try
{
stmt.setInt(1, 8);
stmt.execute();
}
finally
{
stmt.close();
}
connection.commit();
}
finally
{
connection.close();
}
}
private void updateDbVersion(int newVersion) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
{
PreparedStatement statement = conn.prepareStatement(UPDATE_DB_VERSION);
try
{
statement.setInt(1,newVersion);
statement.execute();
}
finally
{
statement.close();
}
}
finally
{
conn.close();
}
}
protected abstract void implementationSpecificConfiguration(String name, Map<String, Object> messageStoreSettings) throws ClassNotFoundException, SQLException;
abstract protected Logger getLogger();
abstract protected String getSqlBlobType();
abstract protected String getSqlVarBinaryType(int size);
abstract protected String getSqlBigIntType();
protected void createOrOpenMessageStoreDatabase() throws SQLException
{
Connection conn = newAutoCommitConnection();
createVersionTable(conn);
createQueueEntryTable(conn);
createMetaDataTable(conn);
createMessageContentTable(conn);
createXidTable(conn);
createXidActionTable(conn);
conn.close();
}
protected void createOrOpenConfigurationStoreDatabase() throws SQLException
{
Connection conn = newAutoCommitConnection();
createConfigVersionTable(conn);
createConfiguredObjectsTable(conn);
createConfiguredObjectHierarchyTable(conn);
conn.close();
}
private void createVersionTable(final Connection conn) throws SQLException
{
if(!tableExists(DB_VERSION_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute(CREATE_DB_VERSION_TABLE);
}
finally
{
stmt.close();
}
PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_DB_VERSION);
try
{
pstmt.setInt(1, DB_VERSION);
pstmt.execute();
}
finally
{
pstmt.close();
}
}
}
private void createConfigVersionTable(final Connection conn) throws SQLException
{
if(!tableExists(CONFIGURATION_VERSION_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute(CREATE_CONFIG_VERSION_TABLE);
}
finally
{
stmt.close();
}
PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_CONFIG_VERSION);
try
{
pstmt.setInt(1, DEFAULT_CONFIG_VERSION);
pstmt.execute();
}
finally
{
pstmt.close();
}
}
}
private void createConfiguredObjectsTable(final Connection conn) throws SQLException
{
if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute("CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME
+ " ( id VARCHAR(36) not null, object_type varchar(255), attributes "+getSqlBlobType()+", PRIMARY KEY (id))");
}
finally
{
stmt.close();
}
}
}
private void createConfiguredObjectHierarchyTable(final Connection conn) throws SQLException
{
if(!tableExists(CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute("CREATE TABLE " + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME
+ " ( child_id VARCHAR(36) not null, parent_type varchar(255), parent_id VARCHAR(36), PRIMARY KEY (child_id, parent_type))");
}
finally
{
stmt.close();
}
}
}
private void createQueueEntryTable(final Connection conn) throws SQLException
{
if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute("CREATE TABLE "+ QUEUE_ENTRY_TABLE_NAME +" ( queue_id varchar(36) not null, message_id "
+ getSqlBigIntType() + " not null, PRIMARY KEY (queue_id, message_id) )");
}
finally
{
stmt.close();
}
}
}
private void createMetaDataTable(final Connection conn) throws SQLException
{
if(!tableExists(META_DATA_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute("CREATE TABLE "
+ META_DATA_TABLE_NAME
+ " ( message_id "
+ getSqlBigIntType()
+ " not null, meta_data "
+ getSqlBlobType()
+ ", PRIMARY KEY ( message_id ) )");
}
finally
{
stmt.close();
}
}
}
private void createMessageContentTable(final Connection conn) throws SQLException
{
if(!tableExists(MESSAGE_CONTENT_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute("CREATE TABLE "
+ MESSAGE_CONTENT_TABLE_NAME
+ " ( message_id "
+ getSqlBigIntType()
+ " not null, content "
+ getSqlBlobType()
+ ", PRIMARY KEY (message_id) )");
}
finally
{
stmt.close();
}
}
}
private void createXidTable(final Connection conn) throws SQLException
{
if(!tableExists(XID_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute("CREATE TABLE "
+ XID_TABLE_NAME
+ " ( format " + getSqlBigIntType() + " not null,"
+ " global_id "
+ getSqlVarBinaryType(64)
+ ", branch_id "
+ getSqlVarBinaryType(64)
+ " , PRIMARY KEY ( format, "
+
"global_id, branch_id ))");
}
finally
{
stmt.close();
}
}
}
private void createXidActionTable(final Connection conn) throws SQLException
{
if(!tableExists(XID_ACTIONS_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
stmt.execute("CREATE TABLE " + XID_ACTIONS_TABLE_NAME + " ( format " + getSqlBigIntType() + " not null,"
+ " global_id " + getSqlVarBinaryType(64) + " not null, branch_id " + getSqlVarBinaryType(
64) + " not null, " +
"action_type char not null, queue_id varchar(36) not null, message_id " + getSqlBigIntType() + " not null" +
", PRIMARY KEY ( " +
"format, global_id, branch_id, action_type, queue_id, message_id))");
}
finally
{
stmt.close();
}
}
}
protected boolean tableExists(final String tableName, final Connection conn) throws SQLException
{
DatabaseMetaData metaData = conn.getMetaData();
ResultSet rs = metaData.getTables(null, null, "%", null);
try
{
while(rs.next())
{
final String table = rs.getString(3);
if(tableName.equalsIgnoreCase(table))
{
return true;
}
}
return false;
}
finally
{
rs.close();
}
}
private void setConfigVersion(int version) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(UPDATE_CONFIG_VERSION);
try
{
stmt.setInt(1, version);
stmt.execute();
}
finally
{
stmt.close();
}
}
finally
{
conn.close();
}
}
private int getConfigVersion() throws SQLException
{
Connection conn = newAutoCommitConnection();
try
{
Statement stmt = conn.createStatement();
try
{
ResultSet rs = stmt.executeQuery(SELECT_FROM_CONFIG_VERSION);
try
{
if(rs.next())
{
return rs.getInt(1);
}
return DEFAULT_CONFIG_VERSION;
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
finally
{
conn.close();
}
}
@Override
public void closeMessageStore()
{
if (_messageStoreOpen.compareAndSet(true, false))
{
if (!_configurationStoreOpen.get())
{
doClose();
}
}
}
@Override
public void closeConfigurationStore()
{
if (_configurationStoreOpen.compareAndSet(true, false))
{
if (!_messageStoreOpen.get())
{
doClose();
}
}
}
protected abstract void doClose();
@Override
public StoredMessage addMessage(StorableMessageMetaData metaData)
{
checkMessageStoreOpen();
if(metaData.isPersistent())
{
return new StoredJDBCMessage(_messageId.incrementAndGet(), metaData);
}
else
{
return new StoredMemoryMessage(_messageId.incrementAndGet(), metaData);
}
}
private void removeMessage(long messageId)
{
try
{
Connection conn = newConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_META_DATA);
try
{
stmt.setLong(1,messageId);
int results = stmt.executeUpdate();
stmt.close();
if (results == 0)
{
getLogger().warn("Message metadata not found for message id " + messageId);
}
if (getLogger().isDebugEnabled())
{
getLogger().debug("Deleted metadata for message " + messageId);
}
stmt = conn.prepareStatement(DELETE_FROM_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
results = stmt.executeUpdate();
}
finally
{
stmt.close();
}
conn.commit();
}
catch(SQLException e)
{
try
{
conn.rollback();
}
catch(SQLException t)
{
// ignore - we are re-throwing underlying exception
}
throw e;
}
finally
{
conn.close();
}
}
catch (SQLException e)
{
throw new StoreException("Error removing message with id " + messageId + " from database: " + e.getMessage(), e);
}
}
@Override
public void create(ConfiguredObjectRecord object) throws StoreException
{
checkConfigurationStoreOpen();
try
{
Connection conn = newConnection();
try
{
insertConfiguredObject(object, conn);
conn.commit();
}
finally
{
conn.close();
}
}
catch (SQLException e)
{
throw new StoreException("Error creating ConfiguredObject " + object);
}
}
/**
* Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
* isolation and with auto-commit transactions enabled.
*/
protected Connection newAutoCommitConnection() throws SQLException
{
final Connection connection = newConnection();
try
{
connection.setAutoCommit(true);
}
catch (SQLException sqlEx)
{
try
{
connection.close();
}
finally
{
throw sqlEx;
}
}
return connection;
}
/**
* Convenience method to create a new Connection configured for TRANSACTION_READ_COMMITED
* isolation and with auto-commit transactions disabled.
*/
protected Connection newConnection() throws SQLException
{
final Connection connection = getConnection();
try
{
connection.setAutoCommit(false);
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
}
catch (SQLException sqlEx)
{
try
{
connection.close();
}
finally
{
throw sqlEx;
}
}
return connection;
}
protected abstract Connection getConnection() throws SQLException;
@Override
public Transaction newTransaction()
{
checkMessageStoreOpen();
return new JDBCTransaction();
}
private void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException
{
Connection conn = connWrapper.getConnection();
try
{
if (getLogger().isDebugEnabled())
{
getLogger().debug("Enqueuing message "
+ messageId
+ " on queue "
+ queue.getName()
+ " with id " + queue.getId()
+ " [Connection"
+ conn
+ "]");
}
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
try
{
stmt.setString(1, queue.getId().toString());
stmt.setLong(2,messageId);
stmt.executeUpdate();
}
finally
{
stmt.close();
}
}
catch (SQLException e)
{
getLogger().error("Failed to enqueue: " + e.getMessage(), e);
throw new StoreException("Error writing enqueued message with id " + messageId + " for queue " + queue.getName() + " with id " + queue.getId()
+ " to database", e);
}
}
private void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException
{
Connection conn = connWrapper.getConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
try
{
stmt.setString(1, queue.getId().toString());
stmt.setLong(2, messageId);
int results = stmt.executeUpdate();
if(results != 1)
{
throw new StoreException("Unable to find message with id " + messageId + " on queue " + queue.getName()
+ " with id " + queue.getId());
}
if (getLogger().isDebugEnabled())
{
getLogger().debug("Dequeuing message " + messageId + " on queue " + queue.getName()
+ " with id " + queue.getId());
}
}
finally
{
stmt.close();
}
}
catch (SQLException e)
{
getLogger().error("Failed to dequeue: " + e.getMessage(), e);
throw new StoreException("Error deleting enqueued message with id " + messageId + " for queue " + queue.getName()
+ " with id " + queue.getId() + " from database", e);
}
}
private void removeXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId)
throws StoreException
{
Connection conn = connWrapper.getConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_XIDS);
try
{
stmt.setLong(1,format);
stmt.setBytes(2,globalId);
stmt.setBytes(3,branchId);
int results = stmt.executeUpdate();
if(results != 1)
{
throw new StoreException("Unable to find message with xid");
}
}
finally
{
stmt.close();
}
stmt = conn.prepareStatement(DELETE_FROM_XID_ACTIONS);
try
{
stmt.setLong(1,format);
stmt.setBytes(2,globalId);
stmt.setBytes(3,branchId);
int results = stmt.executeUpdate();
}
finally
{
stmt.close();
}
}
catch (SQLException e)
{
getLogger().error("Failed to dequeue: " + e.getMessage(), e);
throw new StoreException("Error deleting enqueued message with xid", e);
}
}
private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId,
Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws StoreException
{
Connection conn = connWrapper.getConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_XIDS);
try
{
stmt.setLong(1,format);
stmt.setBytes(2, globalId);
stmt.setBytes(3, branchId);
stmt.executeUpdate();
}
finally
{
stmt.close();
}
stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS);
try
{
stmt.setLong(1,format);
stmt.setBytes(2, globalId);
stmt.setBytes(3, branchId);
if(enqueues != null)
{
stmt.setString(4, "E");
for(Transaction.Record record : enqueues)
{
stmt.setString(5, record.getResource().getId().toString());
stmt.setLong(6, record.getMessage().getMessageNumber());
stmt.executeUpdate();
}
}
if(dequeues != null)
{
stmt.setString(4, "D");
for(Transaction.Record record : dequeues)
{
stmt.setString(5, record.getResource().getId().toString());
stmt.setLong(6, record.getMessage().getMessageNumber());
stmt.executeUpdate();
}
}
}
finally
{
stmt.close();
}
}
catch (SQLException e)
{
getLogger().error("Failed to enqueue: " + e.getMessage(), e);
throw new StoreException("Error writing xid ", e);
}
}
private static final class ConnectionWrapper
{
private final Connection _connection;
public ConnectionWrapper(Connection conn)
{
_connection = conn;
}
public Connection getConnection()
{
return _connection;
}
}
private void commitTran(ConnectionWrapper connWrapper) throws StoreException
{
try
{
Connection conn = connWrapper.getConnection();
conn.commit();
if (getLogger().isDebugEnabled())
{
getLogger().debug("commit tran completed");
}
conn.close();
}
catch (SQLException e)
{
throw new StoreException("Error commit tx: " + e.getMessage(), e);
}
finally
{
}
}
private StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
{
commitTran(connWrapper);
return StoreFuture.IMMEDIATE_FUTURE;
}
private void abortTran(ConnectionWrapper connWrapper) throws StoreException
{
if (connWrapper == null)
{
throw new StoreException("Fatal internal error: transactional context is empty at abortTran");
}
if (getLogger().isDebugEnabled())
{
getLogger().debug("abort tran called: " + connWrapper.getConnection());
}
try
{
Connection conn = connWrapper.getConnection();
conn.rollback();
conn.close();
}
catch (SQLException e)
{
throw new StoreException("Error aborting transaction: " + e.getMessage(), e);
}
}
private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
throws SQLException
{
if(getLogger().isDebugEnabled())
{
getLogger().debug("Adding metadata for message " + messageId);
}
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_META_DATA);
try
{
stmt.setLong(1,messageId);
final int bodySize = 1 + metaData.getStorableSize();
byte[] underlying = new byte[bodySize];
underlying[0] = (byte) metaData.getType().ordinal();
ByteBuffer buf = ByteBuffer.wrap(underlying);
buf.position(1);
buf = buf.slice();
metaData.writeToBuffer(buf);
ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
try
{
stmt.setBinaryStream(2,bis,underlying.length);
int result = stmt.executeUpdate();
if(result == 0)
{
throw new StoreException("Unable to add meta data for message " +messageId);
}
}
finally
{
try
{
bis.close();
}
catch (IOException e)
{
throw new SQLException(e);
}
}
}
finally
{
stmt.close();
}
}
private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueueableMessage
{
private long _messageNumber;
private UUID _queueId;
public RecordImpl(UUID queueId, long messageNumber)
{
_messageNumber = messageNumber;
_queueId = queueId;
}
@Override
public TransactionLogResource getResource()
{
return this;
}
@Override
public EnqueueableMessage getMessage()
{
return this;
}
@Override
public long getMessageNumber()
{
return _messageNumber;
}
@Override
public boolean isPersistent()
{
return true;
}
@Override
public StoredMessage getStoredMessage()
{
throw new UnsupportedOperationException();
}
@Override
public String getName()
{
return _queueId.toString();
}
@Override
public UUID getId()
{
return _queueId;
}
@Override
public boolean isDurable()
{
return true;
}
}
private StorableMessageMetaData getMetaData(long messageId) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
{
PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_META_DATA);
try
{
stmt.setLong(1,messageId);
ResultSet rs = stmt.executeQuery();
try
{
if(rs.next())
{
byte[] dataAsBytes = getBlobAsBytes(rs, 1);
ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
buf.position(1);
buf = buf.slice();
MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
StorableMessageMetaData metaData = type.createMetaData(buf);
return metaData;
}
else
{
throw new StoreException("Meta data not found for message with id " + messageId);
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
finally
{
conn.close();
}
}
protected abstract byte[] getBlobAsBytes(ResultSet rs, int col) throws SQLException;
private void addContent(Connection conn, long messageId, ByteBuffer src)
{
if(getLogger().isDebugEnabled())
{
getLogger().debug("Adding content for message " + messageId);
}
PreparedStatement stmt = null;
try
{
src = src.slice();
byte[] chunkData = new byte[src.limit()];
src.duplicate().get(chunkData);
stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
stmt.setBinaryStream(2, bis, chunkData.length);
stmt.executeUpdate();
}
catch (SQLException e)
{
closeConnection(conn);
throw new StoreException("Error adding content for message " + messageId + ": " + e.getMessage(), e);
}
finally
{
closePreparedStatement(stmt);
}
}
private int getContent(long messageId, int offset, ByteBuffer dst)
{
Connection conn = null;
PreparedStatement stmt = null;
try
{
conn = newAutoCommitConnection();
stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
ResultSet rs = stmt.executeQuery();
int written = 0;
if (rs.next())
{
byte[] dataAsBytes = getBlobAsBytes(rs, 1);
int size = dataAsBytes.length;
if (offset > size)
{
throw new StoreException("Offset " + offset + " is greater than message size " + size
+ " for message id " + messageId + "!");
}
written = size - offset;
if(written > dst.remaining())
{
written = dst.remaining();
}
dst.put(dataAsBytes, offset, written);
}
return written;
}
catch (SQLException e)
{
throw new StoreException("Error retrieving content from offset " + offset + " for message " + messageId + ": " + e.getMessage(), e);
}
finally
{
closePreparedStatement(stmt);
closeConnection(conn);
}
}
@Override
public boolean isPersistent()
{
return true;
}
protected class JDBCTransaction implements Transaction
{
private final ConnectionWrapper _connWrapper;
private int _storeSizeIncrease;
protected JDBCTransaction()
{
try
{
_connWrapper = new ConnectionWrapper(newConnection());
}
catch (SQLException e)
{
throw new StoreException(e);
}
}
@Override
public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
checkMessageStoreOpen();
final StoredMessage storedMessage = message.getStoredMessage();
if(storedMessage instanceof StoredJDBCMessage)
{
try
{
((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
}
catch (SQLException e)
{
throw new StoreException("Exception on enqueuing message into message store" + _messageId, e);
}
}
_storeSizeIncrease += storedMessage.getMetaData().getContentSize();
AbstractJDBCMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
}
@Override
public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
checkMessageStoreOpen();
AbstractJDBCMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
}
@Override
public void commitTran()
{
checkMessageStoreOpen();
AbstractJDBCMessageStore.this.commitTran(_connWrapper);
storedSizeChange(_storeSizeIncrease);
}
@Override
public StoreFuture commitTranAsync()
{
checkMessageStoreOpen();
StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
storedSizeChange(_storeSizeIncrease);
return storeFuture;
}
@Override
public void abortTran()
{
checkMessageStoreOpen();
AbstractJDBCMessageStore.this.abortTran(_connWrapper);
}
@Override
public void removeXid(long format, byte[] globalId, byte[] branchId)
{
checkMessageStoreOpen();
AbstractJDBCMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
}
@Override
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
{
checkMessageStoreOpen();
AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
}
}
private class StoredJDBCMessage implements StoredMessage
{
private final long _messageId;
private final boolean _isRecovered;
private StorableMessageMetaData _metaData;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
private byte[] _data;
private volatile SoftReference<byte[]> _dataRef;
StoredJDBCMessage(long messageId, StorableMessageMetaData metaData)
{
this(messageId, metaData, false);
}
StoredJDBCMessage(long messageId,
StorableMessageMetaData metaData, boolean isRecovered)
{
_messageId = messageId;
_isRecovered = isRecovered;
if(!_isRecovered)
{
_metaData = metaData;
}
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
@Override
public StorableMessageMetaData getMetaData()
{
StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
if(metaData == null)
{
checkMessageStoreOpen();
try
{
metaData = AbstractJDBCMessageStore.this.getMetaData(_messageId);
}
catch (SQLException e)
{
throw new StoreException(e);
}
_metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
return metaData;
}
@Override
public long getMessageNumber()
{
return _messageId;
}
@Override
public void addContent(int offsetInMessage, ByteBuffer src)
{
src = src.slice();
if(_data == null)
{
_data = new byte[src.remaining()];
_dataRef = new SoftReference<byte[]>(_data);
src.duplicate().get(_data);
}
else
{
byte[] oldData = _data;
_data = new byte[oldData.length + src.remaining()];
_dataRef = new SoftReference<byte[]>(_data);
System.arraycopy(oldData,0,_data,0,oldData.length);
src.duplicate().get(_data, oldData.length, src.remaining());
}
}
@Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
byte[] data = _dataRef == null ? null : _dataRef.get();
if(data != null)
{
int length = Math.min(dst.remaining(), data.length - offsetInMessage);
dst.put(data, offsetInMessage, length);
return length;
}
else
{
checkMessageStoreOpen();
return AbstractJDBCMessageStore.this.getContent(_messageId, offsetInMessage, dst);
}
}
@Override
public ByteBuffer getContent(int offsetInMessage, int size)
{
ByteBuffer buf = ByteBuffer.allocate(size);
int length = getContent(offsetInMessage, buf);
buf.position(0);
buf.limit(length);
return buf;
}
@Override
public synchronized StoreFuture flushToStore()
{
checkMessageStoreOpen();
Connection conn = null;
try
{
if(!stored())
{
conn = newConnection();
store(conn);
conn.commit();
storedSizeChange(getMetaData().getContentSize());
}
}
catch (SQLException e)
{
if(getLogger().isDebugEnabled())
{
getLogger().debug("Error when trying to flush message " + _messageId + " to store: " + e);
}
throw new StoreException(e);
}
finally
{
closeConnection(conn);
}
return StoreFuture.IMMEDIATE_FUTURE;
}
@Override
public void remove()
{
checkMessageStoreOpen();
int delta = getMetaData().getContentSize();
AbstractJDBCMessageStore.this.removeMessage(_messageId);
storedSizeChange(-delta);
}
private synchronized void store(final Connection conn) throws SQLException
{
if (!stored())
{
try
{
storeMetaData(conn, _messageId, _metaData);
AbstractJDBCMessageStore.this.addContent(conn, _messageId,
_data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
}
finally
{
_metaData = null;
_data = null;
}
if(getLogger().isDebugEnabled())
{
getLogger().debug("Storing message " + _messageId + " to store");
}
}
}
private boolean stored()
{
return _metaData == null || _isRecovered;
}
}
protected void closeConnection(final Connection conn)
{
if(conn != null)
{
try
{
conn.close();
}
catch (SQLException e)
{
getLogger().error("Problem closing connection", e);
}
}
}
protected void closePreparedStatement(final PreparedStatement stmt)
{
if (stmt != null)
{
try
{
stmt.close();
}
catch(SQLException e)
{
getLogger().error("Problem closing prepared statement", e);
}
}
}
@Override
public void addEventListener(EventListener eventListener, Event... events)
{
_eventManager.addEventListener(eventListener, events);
}
private void insertConfiguredObject(ConfiguredObjectRecord configuredObject, final Connection conn) throws StoreException
{
try
{
PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
try
{
stmt.setString(1, configuredObject.getId().toString());
ResultSet rs = stmt.executeQuery();
boolean exists;
try
{
exists = rs.next();
}
finally
{
rs.close();
}
// If we don't have any data in the result set then we can add this configured object
if (!exists)
{
PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
try
{
insertStmt.setString(1, configuredObject.getId().toString());
insertStmt.setString(2, configuredObject.getType());
if(configuredObject.getAttributes() == null)
{
insertStmt.setNull(3, Types.BLOB);
}
else
{
final Map<String, Object> attributes = configuredObject.getAttributes();
final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(_module);
byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
}
insertStmt.execute();
}
finally
{
insertStmt.close();
}
writeHierarchy(configuredObject, conn);
}
}
finally
{
stmt.close();
}
}
catch (JsonMappingException e)
{
throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
}
catch (JsonGenerationException e)
{
throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
}
catch (IOException e)
{
throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
}
catch (SQLException e)
{
throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
}
}
@Override
public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException
{
checkConfigurationStoreOpen();
Collection<UUID> removed = new ArrayList<UUID>(objects.length);
try
{
Connection conn = newAutoCommitConnection();
try
{
for(ConfiguredObjectRecord record : objects)
{
if(removeConfiguredObject(record.getId(), conn) != 0)
{
removed.add(record.getId());
}
}
}
finally
{
conn.close();
}
}
catch (SQLException e)
{
throw new StoreException("Error deleting of configured objects " + Arrays.asList(objects) + " from database: " + e.getMessage(), e);
}
return removed.toArray(new UUID[removed.size()]);
}
private int removeConfiguredObject(final UUID id, final Connection conn) throws SQLException
{
final int results;
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
try
{
stmt.setString(1, id.toString());
results = stmt.executeUpdate();
}
finally
{
stmt.close();
}
stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECT_HIERARCHY);
try
{
stmt.setString(1, id.toString());
stmt.executeUpdate();
}
finally
{
stmt.close();
}
return results;
}
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
{
checkConfigurationStoreOpen();
try
{
Connection conn = newConnection();
try
{
for(ConfiguredObjectRecord record : records)
{
updateConfiguredObject(record, createIfNecessary, conn);
}
conn.commit();
}
finally
{
conn.close();
}
}
catch (SQLException e)
{
throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e);
}
}
private void updateConfiguredObject(ConfiguredObjectRecord configuredObject,
boolean createIfNecessary,
Connection conn)
throws SQLException, StoreException
{
PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
try
{
stmt.setString(1, configuredObject.getId().toString());
ResultSet rs = stmt.executeQuery();
try
{
final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(_module);
if (rs.next())
{
PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
try
{
stmt2.setString(1, configuredObject.getType());
if (configuredObject.getAttributes() != null)
{
byte[] attributesAsBytes = objectMapper.writeValueAsBytes(
configuredObject.getAttributes());
ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
}
else
{
stmt2.setNull(2, Types.BLOB);
}
stmt2.setString(3, configuredObject.getId().toString());
stmt2.execute();
}
finally
{
stmt2.close();
}
}
else if(createIfNecessary)
{
PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
try
{
insertStmt.setString(1, configuredObject.getId().toString());
insertStmt.setString(2, configuredObject.getType());
if(configuredObject.getAttributes() == null)
{
insertStmt.setNull(3, Types.BLOB);
}
else
{
final Map<String, Object> attributes = configuredObject.getAttributes();
byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
}
insertStmt.execute();
}
finally
{
insertStmt.close();
}
writeHierarchy(configuredObject, conn);
}
}
finally
{
rs.close();
}
}
catch (JsonMappingException e)
{
throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
}
catch (JsonGenerationException e)
{
throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
}
catch (IOException e)
{
throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
}
finally
{
stmt.close();
}
}
private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection conn) throws SQLException, StoreException
{
PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECT_HIERARCHY);
try
{
for(Map.Entry<String,ConfiguredObjectRecord> parentEntry : configuredObject.getParents().entrySet())
{
insertStmt.setString(1, configuredObject.getId().toString());
insertStmt.setString(2, parentEntry.getKey());
insertStmt.setString(3, parentEntry.getValue().getId().toString());
insertStmt.execute();
}
}
finally
{
insertStmt.close();
}
}
@Override
public void visitMessages(MessageHandler handler) throws StoreException
{
checkMessageStoreOpen();
Connection conn = null;
try
{
conn = newAutoCommitConnection();
Statement stmt = conn.createStatement();
try
{
ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_META_DATA);
try
{
while (rs.next())
{
long messageId = rs.getLong(1);
byte[] dataAsBytes = getBlobAsBytes(rs, 2);
ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
buf.position(1);
buf = buf.slice();
MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
StorableMessageMetaData metaData = type.createMetaData(buf);
StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
if (!handler.handle(message))
{
break;
}
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
catch (SQLException e)
{
throw new StoreException("Error encountered when visiting messages", e);
}
finally
{
closeConnection(conn);
}
}
@Override
public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
{
checkMessageStoreOpen();
Connection conn = null;
try
{
conn = newAutoCommitConnection();
Statement stmt = conn.createStatement();
try
{
ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE_ENTRY);
try
{
while(rs.next())
{
String id = rs.getString(1);
long messageId = rs.getLong(2);
if (!handler.handle(UUID.fromString(id), messageId))
{
break;
}
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
}
catch(SQLException e)
{
throw new StoreException("Error encountered when visiting message instances", e);
}
finally
{
closeConnection(conn);
}
}
@Override
public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
{
checkMessageStoreOpen();
Connection conn = null;
try
{
conn = newAutoCommitConnection();
List<Xid> xids = new ArrayList<Xid>();
Statement stmt = conn.createStatement();
try
{
ResultSet rs = stmt.executeQuery(SELECT_ALL_FROM_XIDS);
try
{
while(rs.next())
{
long format = rs.getLong(1);
byte[] globalId = rs.getBytes(2);
byte[] branchId = rs.getBytes(3);
xids.add(new Xid(format, globalId, branchId));
}
}
finally
{
rs.close();
}
}
finally
{
stmt.close();
}
for(Xid xid : xids)
{
List<RecordImpl> enqueues = new ArrayList<RecordImpl>();
List<RecordImpl> dequeues = new ArrayList<RecordImpl>();
PreparedStatement pstmt = conn.prepareStatement(SELECT_ALL_FROM_XID_ACTIONS);
try
{
pstmt.setLong(1, xid.getFormat());
pstmt.setBytes(2, xid.getGlobalId());
pstmt.setBytes(3, xid.getBranchId());
ResultSet rs = pstmt.executeQuery();
try
{
while(rs.next())
{
String actionType = rs.getString(1);
UUID queueId = UUID.fromString(rs.getString(2));
long messageId = rs.getLong(3);
RecordImpl record = new RecordImpl(queueId, messageId);
List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
records.add(record);
}
}
finally
{
rs.close();
}
}
finally
{
pstmt.close();
}
if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(),
enqueues.toArray(new RecordImpl[enqueues.size()]),
dequeues.toArray(new RecordImpl[dequeues.size()])))
{
break;
}
}
}
catch (SQLException e)
{
throw new StoreException("Error encountered when visiting distributed transactions", e);
}
finally
{
closeConnection(conn);
}
}
protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
protected abstract void storedSizeChange(int storeSizeIncrease);
@Override
public void onDelete()
{
// TODO should probably check we are closed
try
{
Connection conn = newAutoCommitConnection();
try
{
List<String> tables = new ArrayList<String>();
tables.addAll(CONFIGURATION_STORE_TABLE_NAMES);
tables.addAll(MESSAGE_STORE_TABLE_NAMES);
for (String tableName : tables)
{
Statement stmt = conn.createStatement();
try
{
stmt.execute("DROP TABLE " + tableName);
}
catch(SQLException e)
{
getLogger().warn("Failed to drop table '" + tableName + "' :" + e);
}
finally
{
stmt.close();
}
}
}
finally
{
conn.close();
}
}
catch(SQLException e)
{
getLogger().error("Exception while deleting store tables", e);
}
}
private static final class ConfiguredObjectRecordImpl implements ConfiguredObjectRecord
{
private final UUID _id;
private final String _type;
private final Map<String, Object> _attributes;
private final Map<String, ConfiguredObjectRecord> _parents = new HashMap<String, ConfiguredObjectRecord>();
private ConfiguredObjectRecordImpl(final UUID id,
final String type,
final Map<String, Object> attributes)
{
_id = id;
_type = type;
_attributes = Collections.unmodifiableMap(attributes);
}
@Override
public UUID getId()
{
return _id;
}
@Override
public String getType()
{
return _type;
}
private void addParent(String parentType, ConfiguredObjectRecord parent)
{
_parents.put(parentType, parent);
}
@Override
public Map<String, Object> getAttributes()
{
return _attributes;
}
@Override
public Map<String, ConfiguredObjectRecord> getParents()
{
return Collections.unmodifiableMap(_parents);
}
}
}