blob: d79752cd10c7358d14e9454dd5cd9ddf62d6f09f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.server.store.preferences;
import static org.apache.qpid.server.store.JdbcUtils.tableExists;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObjectJacksonModule;
import org.apache.qpid.server.model.ModelVersion;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.util.BaseAction;
public abstract class AbstractJDBCPreferenceStore implements PreferenceStore
{
private static final String PREFERENCES_VERSION_TABLE_NAME = "PREFERENCES_VERSION";
private static final String PREFERENCES_TABLE_NAME = "PREFERENCES";
private static final String CREATE_PREFERENCES_VERSION_TABLE =
"CREATE TABLE " + PREFERENCES_VERSION_TABLE_NAME + " ( version VARCHAR(20) NOT NULL )";
private static final String INSERT_INTO_PREFERENCES_VERSION =
"INSERT INTO " + PREFERENCES_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
private static final String SELECT_FROM_PREFERENCES_VERSION =
"SELECT version FROM " + PREFERENCES_VERSION_TABLE_NAME;
private static final String INSERT_INTO_PREFERENCES = "INSERT INTO " + PREFERENCES_TABLE_NAME + " ( id, attributes ) VALUES ( ?, ? )";
private static final String DELETE_FROM_PREFERENCES = "DELETE FROM " + PREFERENCES_TABLE_NAME + " where id = ?";
private static final String SELECT_FROM_PREFERENCES = "SELECT id, attributes FROM " + PREFERENCES_TABLE_NAME;
private static final String FIND_PREFERENCE = "SELECT attributes FROM " + PREFERENCES_TABLE_NAME + " WHERE id = ?";
private static final String UPDATE_PREFERENCES = "UPDATE " + PREFERENCES_TABLE_NAME + " SET attributes = ? WHERE id = ?";
private final AtomicReference<StoreState> _storeState = new AtomicReference<>(StoreState.CLOSED);
private final ReentrantReadWriteLock _useOrCloseRWLock = new ReentrantReadWriteLock(true);
@Override
public Collection<PreferenceRecord> openAndLoad(final PreferenceStoreUpdater updater) throws StoreException
{
if (!_storeState.compareAndSet(StoreState.CLOSED, StoreState.OPENING))
{
throw new IllegalStateException(String.format("PreferenceStore cannot be opened when in state '%s'",
_storeState.get()));
}
try
{
_storeState.set(StoreState.OPENED);
Collection<PreferenceRecord> records;
try (Connection connection = getConnection())
{
createVersionTable(connection);
createPreferencesTable(connection);
ModelVersion preferencesVersion = getPreferencesVersion(connection);
ModelVersion brokerModelVersion = ModelVersion.fromString(BrokerModel.MODEL_VERSION);
if (brokerModelVersion.lessThan(preferencesVersion))
{
throw new StoreException(String.format("Cannot downgrade preference store from '%s' to '%s'", preferencesVersion, brokerModelVersion));
}
records = getPreferenceRecords(connection);
if (preferencesVersion.lessThan(brokerModelVersion))
{
final Collection<UUID> ids = new HashSet<>();
for (PreferenceRecord record : records)
{
ids.add(record.getId());
}
records = updater.updatePreferences(preferencesVersion.toString(), records);
replace(ids, records);
}
}
return records;
}
catch (SQLException e)
{
_storeState.set(StoreState.ERRORED);
close();
throw new StoreException(e);
}
}
@Override
public void updateOrCreate(final Collection<PreferenceRecord> preferenceRecords)
{
_useOrCloseRWLock.readLock().lock();
try
{
if (!getStoreState().equals(StoreState.OPENED))
{
throw new IllegalStateException("PreferenceStore is not opened");
}
performSafeTransaction(new BaseAction<Connection, Exception>()
{
@Override
public void performAction(final Connection connection) throws Exception
{
updateOrCreateInternal(connection, preferenceRecords);
}
});
}
finally
{
_useOrCloseRWLock.readLock().unlock();
}
}
@Override
public void replace(final Collection<UUID> preferenceRecordsToRemove,
final Collection<PreferenceRecord> preferenceRecordsToAdd)
{
_useOrCloseRWLock.readLock().lock();
try
{
if (!getStoreState().equals(StoreState.OPENED))
{
throw new IllegalStateException("PreferenceStore is not opened");
}
performSafeTransaction(new BaseAction<Connection, Exception>()
{
@Override
public void performAction(final Connection connection) throws Exception
{
for (UUID id : preferenceRecordsToRemove)
{
try (PreparedStatement deleteStatement = connection.prepareStatement(DELETE_FROM_PREFERENCES))
{
deleteStatement.setString(1, id.toString());
int deletedCount = deleteStatement.executeUpdate();
if (deletedCount == 1)
{
getLogger().debug(String.format(
"Failed to delete preference with id %s : no such record",
id));
}
}
}
updateOrCreateInternal(connection, preferenceRecordsToAdd);
}
});
}
finally
{
_useOrCloseRWLock.readLock().unlock();
}
}
@Override
public void onDelete()
{
_useOrCloseRWLock.writeLock().lock();
try
{
close();
doDelete();
}
finally
{
_useOrCloseRWLock.writeLock().unlock();
}
}
@Override
public void close()
{
_useOrCloseRWLock.writeLock().lock();
try
{
while (true)
{
StoreState storeState = getStoreState();
if (storeState.equals(StoreState.OPENED) || storeState.equals(StoreState.ERRORED))
{
if (_storeState.compareAndSet(storeState, StoreState.CLOSING))
{
break;
}
}
else if (storeState.equals(StoreState.CLOSED) || storeState.equals(StoreState.CLOSING))
{
return;
}
}
doClose();
_storeState.set(StoreState.CLOSED);
}
finally
{
_useOrCloseRWLock.writeLock().unlock();
}
}
protected void dropTables(final Connection connection) throws SQLException
{
try (Statement dropTableStatement = connection.createStatement())
{
dropTableStatement.execute("DROP TABLE " + PREFERENCES_TABLE_NAME);
dropTableStatement.execute("DROP TABLE " + PREFERENCES_VERSION_TABLE_NAME);
}
catch (SQLException e)
{
getLogger().warn("Failed to drop preferences table", e);
}
}
protected abstract void doDelete();
protected abstract void doClose();
protected abstract Logger getLogger();
protected abstract Connection getConnection() throws SQLException;
protected abstract String getSqlBlobType();
protected abstract String getBlobAsString(ResultSet rs, int col) throws SQLException;
StoreState getStoreState()
{
return _storeState.get();
}
private void updateOrCreateInternal(final Connection conn,
final Collection<PreferenceRecord> preferenceRecords)
throws SQLException, JsonProcessingException
{
for (PreferenceRecord record : preferenceRecords)
{
try (PreparedStatement stmt = conn.prepareStatement(FIND_PREFERENCE))
{
stmt.setString(1, record.getId().toString());
try (ResultSet rs = stmt.executeQuery())
{
if (rs.next())
{
try (PreparedStatement updateStatement = conn.prepareStatement(UPDATE_PREFERENCES))
{
setAttributesAsBlob(updateStatement, 1, record.getAttributes());
updateStatement.setString(2, record.getId().toString());
updateStatement.execute();
}
}
else
{
try (PreparedStatement insertStatement = conn.prepareStatement(INSERT_INTO_PREFERENCES))
{
insertStatement.setString(1, record.getId().toString());
setAttributesAsBlob(insertStatement, 2, record.getAttributes());
insertStatement.execute();
}
}
}
}
}
}
private void performSafeTransaction(final BaseAction<Connection, Exception> transactedAction)
{
Connection connection = null;
try
{
connection = getTransactedConnection();
transactedAction.performAction(connection);
connection.commit();
}
catch (Exception e)
{
try
{
if (connection != null)
{
connection.rollback();
}
}
catch (SQLException e1)
{
getLogger().error("Failed to rollback transaction", e1);
}
throw new StoreException(e);
}
finally
{
try
{
if (connection != null)
{
connection.close();
}
}
catch (SQLException e)
{
getLogger().warn("Failed to close JDBC connection", e);
}
}
}
private Connection getTransactedConnection() throws SQLException
{
final Connection connection = getConnection();
connection.setAutoCommit(false);
connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
return connection;
}
private void setAttributesAsBlob(final PreparedStatement preparedSqlStatement,
final int parameterIndex,
final Map<String, Object> attributes)
throws JsonProcessingException, SQLException
{
final ObjectMapper objectMapper = ConfiguredObjectJacksonModule.newObjectMapper(true);
if (attributes != null)
{
byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
preparedSqlStatement.setBinaryStream(parameterIndex, bis, attributesAsBytes.length);
}
else
{
preparedSqlStatement.setNull(parameterIndex, Types.BLOB);
}
}
private void createVersionTable(final Connection conn) throws SQLException
{
if (!tableExists(PREFERENCES_VERSION_TABLE_NAME, conn))
{
try (Statement stmt = conn.createStatement())
{
stmt.execute(CREATE_PREFERENCES_VERSION_TABLE);
}
try (PreparedStatement pstmt = conn.prepareStatement(INSERT_INTO_PREFERENCES_VERSION))
{
pstmt.setString(1, BrokerModel.MODEL_VERSION);
pstmt.execute();
}
}
}
private void createPreferencesTable(final Connection conn) throws SQLException
{
if (!tableExists(PREFERENCES_TABLE_NAME, conn))
{
try (Statement stmt = conn.createStatement())
{
stmt.execute("CREATE TABLE "
+ PREFERENCES_TABLE_NAME
+ " ( id VARCHAR(36) not null, attributes "
+ getSqlBlobType()
+ ", PRIMARY KEY (id))");
}
}
}
private ModelVersion getPreferencesVersion(Connection conn) throws SQLException
{
try (Statement stmt = conn.createStatement())
{
try (ResultSet rs = stmt.executeQuery(SELECT_FROM_PREFERENCES_VERSION))
{
if (rs.next())
{
String versionString = rs.getString(1);
try
{
return ModelVersion.fromString(versionString);
}
catch (IllegalArgumentException e)
{
throw new StoreException("preference store version is malformed", e);
}
}
throw new StoreException("No preferences version found");
}
}
}
private Collection<PreferenceRecord> getPreferenceRecords(final Connection connection) throws SQLException
{
Collection<PreferenceRecord> records = new LinkedHashSet<>();
final ObjectMapper objectMapper = new ObjectMapper();
try (PreparedStatement stmt = connection.prepareStatement(SELECT_FROM_PREFERENCES))
{
try (ResultSet rs = stmt.executeQuery())
{
while (rs.next())
{
String id = rs.getString(1);
String attributes = getBlobAsString(rs, 2);
final PreferenceRecord preferenceRecord = new PreferenceRecordImpl(UUID.fromString(id), objectMapper.readValue(attributes, Map.class));
records.add(preferenceRecord);
}
}
catch (IOException e)
{
throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
}
}
return records;
}
enum StoreState
{
CLOSED, OPENING, OPENED, CLOSING, ERRORED;
}
}