blob: fd385090bf0c373941cc50f7deaaa651775b5bcb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v1_0.store.jdbc;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import com.google.common.io.ByteStreams;
import com.google.common.io.CharStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ModelVersion;
import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
import org.apache.qpid.server.protocol.v1_0.LinkDefinitionImpl;
import org.apache.qpid.server.protocol.v1_0.LinkKey;
import org.apache.qpid.server.protocol.v1_0.store.AbstractLinkStore;
import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdater;
import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUtils;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.jdbc.JDBCContainer;
import org.apache.qpid.server.store.jdbc.JDBCDetails;
import org.apache.qpid.server.store.jdbc.JdbcUtils;
import org.apache.qpid.server.util.Action;
public class JDBCLinkStore extends AbstractLinkStore
{
private static final Logger LOGGER = LoggerFactory.getLogger(JDBCLinkStore.class);
private static final String LINKS_TABLE_NAME_SUFFIX = "AMQP_1_0_LINKS";
private static final String VERSION_TABLE_NAME_SUFFIX = "AMQP_1_0_LINKS_VERSION";
private final JDBCContainer _jdbcContainer;
private final String _tableNamePrefix;
private final String _sqlBlobType;
private final String _sqlTimestampType;
private final boolean _isUseBytesMethodsForBlob;
private final Action<Connection> _cleanUpAction;
JDBCLinkStore(final JDBCContainer jdbcContainer)
{
_jdbcContainer = jdbcContainer;
_tableNamePrefix = jdbcContainer.getTableNamePrefix();
JDBCDetails jdbcDetails = jdbcContainer.getJDBCDetails();
_sqlBlobType = jdbcDetails.getBlobType();
_sqlTimestampType = jdbcDetails.getTimestampType();
_isUseBytesMethodsForBlob = jdbcDetails.isUseBytesMethodsForBlob();
_cleanUpAction = this::cleanUp;
jdbcContainer.addDeleteAction(_cleanUpAction);
}
@Override
protected Collection<LinkDefinition<Source, Target>> doOpenAndLoad(final LinkStoreUpdater updater) throws StoreException
{
Collection<LinkDefinition<Source, Target>> linkDefinitions;
try
{
checkTransactionIsolationLevel();
createOrOpenStoreDatabase();
linkDefinitions = getLinks();
ModelVersion storedVersion = getStoredVersion();
ModelVersion currentVersion =
new ModelVersion(BrokerModel.MODEL_MAJOR_VERSION, BrokerModel.MODEL_MINOR_VERSION);
if (storedVersion.lessThan(currentVersion))
{
linkDefinitions = performUpdate(updater, linkDefinitions, storedVersion, currentVersion);
}
else if (currentVersion.lessThan(storedVersion))
{
throw new StoreException(String.format("Cannot downgrade the store from %s to %s",
storedVersion,
currentVersion));
}
}
catch (SQLException e)
{
throw new StoreException("Cannot open link store", e);
}
return linkDefinitions;
}
@Override
protected void doClose() throws StoreException
{
}
@Override
protected void doSaveLink(final LinkDefinition<Source, Target> link) throws StoreException
{
String linkKey = generateLinkKey(link);
Connection connection = getConnection();
try
{
connection.setAutoCommit(false);
connection.setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE);
try (PreparedStatement preparedStatement = connection.prepareStatement(
String.format(
"SELECT remote_container_id, link_name, link_role, source, target FROM %s WHERE link_key = ?",
getLinksTableName())))
{
preparedStatement.setString(1, linkKey);
try (ResultSet resultSet = preparedStatement.executeQuery())
{
if (resultSet.next())
{
update(connection, linkKey, link);
}
else
{
insert(connection, linkKey, link);
}
}
}
connection.commit();
}
catch (SQLException e)
{
try
{
connection.rollback();
}
catch (SQLException re)
{
LOGGER.debug("Rollback failed on rolling back saving link transaction", re);
}
throw new StoreException(String.format("Cannot save link %s", new LinkKey(link)), e);
}
finally
{
JdbcUtils.closeConnection(connection, LOGGER);
}
}
@Override
protected void doDeleteLink(final LinkDefinition<Source, Target> link) throws StoreException
{
try (Connection connection = getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(
String.format("DELETE FROM %s WHERE link_key = ?", getLinksTableName())))
{
preparedStatement.setString(1, generateLinkKey(link));
preparedStatement.execute();
}
catch (SQLException e)
{
throw new StoreException(String.format("Cannot delete link %s", new LinkKey(link)), e);
}
}
@Override
protected void doDelete()
{
_jdbcContainer.removeDeleteAction(_cleanUpAction);
try (Connection connection = getConnection())
{
cleanUp(connection);
}
catch (IllegalStateException e)
{
LOGGER.warn("Could not delete Link store: {}", e.getMessage());
}
catch (SQLException e)
{
throw new StoreException("Error deleting Link store", e);
}
}
private void cleanUp(final Connection connection)
{
JdbcUtils.dropTables(connection, LOGGER, Arrays.asList(getLinksTableName(), getVersionTableName()));
}
@Override
public TerminusDurability getHighestSupportedTerminusDurability()
{
return TerminusDurability.CONFIGURATION;
}
private void checkTransactionIsolationLevel() throws SQLException
{
try (Connection connection = getConnection())
{
DatabaseMetaData metaData = connection.getMetaData();
if (!metaData.supportsTransactionIsolationLevel(Connection.TRANSACTION_SERIALIZABLE))
{
throw new StoreException(String.format(
"The RDBMS '%s' does not support required transaction isolation level 'serializable'",
metaData.getDatabaseProductName()));
}
}
}
private Connection getConnection()
{
return _jdbcContainer.getConnection();
}
private void createOrOpenStoreDatabase() throws SQLException
{
try (Connection conn = getConnection())
{
conn.setAutoCommit(true);
createLinksTable(conn);
createVersionTable(conn);
}
}
private void createVersionTable(final Connection conn) throws SQLException
{
String versionTableName = getVersionTableName();
if (!JdbcUtils.tableExists(versionTableName, conn))
{
try (Statement stmt = conn.createStatement())
{
stmt.execute(String.format("CREATE TABLE %s"
+ " (version varchar(10) PRIMARY KEY ,"
+ " version_time %s)", versionTableName, _sqlTimestampType));
}
updateVersion(conn, ModelVersion.fromString(BrokerModel.MODEL_VERSION));
}
}
private void createLinksTable(final Connection conn) throws SQLException
{
if (!JdbcUtils.tableExists(getLinksTableName(), conn))
{
try (Statement stmt = conn.createStatement())
{
stmt.execute(String.format("CREATE TABLE %1$s"
+ " ( link_key varchar(44) PRIMARY KEY ,"
+ " remote_container_id %2$s, "
+ " link_name %2$s,"
+ " link_role INTEGER,"
+ " source %2$s,"
+ " target %2$s )", getLinksTableName(), _sqlBlobType));
}
}
}
private String getLinksTableName()
{
return _tableNamePrefix + LINKS_TABLE_NAME_SUFFIX;
}
private String getVersionTableName()
{
return _tableNamePrefix + VERSION_TABLE_NAME_SUFFIX;
}
private Collection<LinkDefinition<Source, Target>> performUpdate(final LinkStoreUpdater updater,
Collection<LinkDefinition<Source, Target>> linkDefinitions,
final ModelVersion storedVersion,
final ModelVersion currentVersion)
throws SQLException
{
linkDefinitions = updater.update(storedVersion.toString(), linkDefinitions);
Connection connection = getConnection();
try
{
connection.setAutoCommit(false);
try (Statement statement = connection.createStatement())
{
statement.execute("DELETE FROM " + getLinksTableName());
}
for (LinkDefinition<? extends BaseSource, ? extends BaseTarget> linkDefinition : linkDefinitions)
{
insert(connection, generateLinkKey(linkDefinition), linkDefinition);
}
updateVersion(connection, currentVersion);
connection.commit();
}
catch (SQLException e)
{
try
{
connection.rollback();
}
catch (SQLException re)
{
LOGGER.debug("Cannot rollback transaction", re);
}
throw e;
}
finally
{
JdbcUtils.closeConnection(connection, LOGGER);
}
return linkDefinitions;
}
private Collection<LinkDefinition<Source, Target>> getLinks() throws SQLException
{
Collection<LinkDefinition<Source, Target>> links = new ArrayList<>();
try (Connection connection = getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(String.format(
"SELECT remote_container_id, link_name, link_role, source, target FROM %s",
getLinksTableName())))
{
while (resultSet.next())
{
String remoteContainerId = getBlobValueAsString(resultSet, 1);
String linkName = getBlobValueAsString(resultSet, 2);
Role role = Role.valueOf(resultSet.getBoolean(3));
Source source = (Source) getBlobAsAmqpObject(resultSet, 4);
Target target = (Target) getBlobAsAmqpObject(resultSet, 5);
links.add(new LinkDefinitionImpl<>(remoteContainerId, linkName, role, source, target));
}
}
catch (IllegalArgumentException e)
{
throw new StoreException("Cannot load links from store", e);
}
return links;
}
private Object getBlobAsAmqpObject(final ResultSet resultSet, final int index) throws SQLException
{
byte[] sourceBytes;
if (_isUseBytesMethodsForBlob)
{
sourceBytes = resultSet.getBytes(index);
}
else
{
Blob blob = resultSet.getBlob(index);
try (InputStream is = blob.getBinaryStream())
{
sourceBytes = ByteStreams.toByteArray(is);
}
catch (IOException e)
{
throw new StoreException("Cannot convert blob to string", e);
}
finally
{
blob.free();
}
}
return LinkStoreUtils.amqpBytesToObject(sourceBytes);
}
private String getBlobValueAsString(final ResultSet resultSet, final int index) throws SQLException
{
if (_isUseBytesMethodsForBlob)
{
return new String(resultSet.getBytes(index), UTF_8);
}
Blob blob = resultSet.getBlob(index);
try (InputStream is = blob.getBinaryStream();
InputStreamReader isr = new InputStreamReader(is, UTF_8))
{
return CharStreams.toString(isr);
}
catch (IOException e)
{
throw new StoreException("Cannot convert blob to string", e);
}
finally
{
blob.free();
}
}
private ModelVersion getStoredVersion() throws SQLException
{
ModelVersion version = null;
try (Connection connection = getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(String.format("SELECT version FROM %s",
getVersionTableName())))
{
while (resultSet.next())
{
ModelVersion storedVersion = ModelVersion.fromString(resultSet.getString(1));
if (version == null || version.lessThan(storedVersion))
{
version = storedVersion;
}
}
}
if (version == null)
{
throw new StoreException("Version of links is not found");
}
return version;
}
private void updateVersion(final Connection connection, final ModelVersion currentVersion) throws SQLException
{
String version = currentVersion.toString();
try (PreparedStatement statement = connection.prepareStatement(String.format(
"INSERT INTO %s (version, version_time) VALUES (?,?)",
getVersionTableName())))
{
statement.setString(1, version);
statement.setDate(2, new java.sql.Date(System.currentTimeMillis()));
if (statement.executeUpdate() != 1)
{
throw new StoreException(String.format("Cannot insert version '%s' into version table", version));
}
}
}
private void insert(final Connection connection,
final String linkKey,
final LinkDefinition<? extends BaseSource, ? extends BaseTarget> linkDefinition)
throws SQLException
{
try (PreparedStatement statement = connection.prepareStatement(String.format(
"INSERT INTO %s (link_key, remote_container_id, link_name, link_role, source, target) VALUES (?,?,?,?,?,?)",
getLinksTableName())))
{
statement.setString(1, linkKey);
saveStringAsBlob(statement, 2, linkDefinition.getRemoteContainerId());
saveStringAsBlob(statement, 3, linkDefinition.getName());
statement.setInt(4, linkDefinition.getRole().getValue() ? 1 : 0);
saveObjectAsBlob(statement, 5, linkDefinition.getSource());
saveObjectAsBlob(statement, 6, linkDefinition.getTarget());
if (statement.executeUpdate() != 1)
{
throw new StoreException(String.format("Cannot save link %s", new LinkKey(linkDefinition)));
}
}
}
private void update(final Connection connection,
final String linkKey,
final LinkDefinition<? extends BaseSource, ? extends BaseTarget> linkDefinition)
throws SQLException
{
try (PreparedStatement statement = connection.prepareStatement(String.format(
"UPDATE %s SET source = ?, target = ? WHERE link_key = ?",
getLinksTableName())))
{
saveObjectAsBlob(statement, 1, linkDefinition.getSource());
saveObjectAsBlob(statement, 2, linkDefinition.getTarget());
statement.setString(3, linkKey);
if (statement.executeUpdate() != 1)
{
throw new StoreException(String.format("Cannot save link %s", new LinkKey(linkDefinition)));
}
}
}
private void saveObjectAsBlob(final PreparedStatement statement, final int index, final Object object)
throws SQLException
{
saveBytesAsBlob(statement, index, LinkStoreUtils.objectToAmqpBytes(object));
}
private void saveBytesAsBlob(final PreparedStatement statement, final int index, final byte[] bytes)
throws SQLException
{
if (_isUseBytesMethodsForBlob)
{
statement.setBytes(index, bytes);
}
else
{
try (InputStream inputStream = new ByteArrayInputStream(bytes))
{
statement.setBlob(index, inputStream);
}
catch (IOException e)
{
throw new StoreException("Cannot save link", e);
}
}
}
private void saveStringAsBlob(final PreparedStatement statement, final int index, final String value)
throws SQLException
{
saveBytesAsBlob(statement, index, value.getBytes(UTF_8));
}
private String generateLinkKey(final LinkDefinition<?, ?> linkDefinition)
{
MessageDigest md;
try
{
md = MessageDigest.getInstance("SHA-256");
}
catch (NoSuchAlgorithmException e)
{
throw new StoreException("Cannot generate SHA-256 checksum", e);
}
md.update(linkDefinition.getRemoteContainerId().getBytes(UTF_8));
md.update(linkDefinition.getName().getBytes(UTF_8));
md.update(linkDefinition.getRole().getValue() ? (byte) 1 : (byte) 0);
return Base64.getEncoder().encodeToString(md.digest());
}
}