QPID-8214: [Broker-J][JDBC] Reduce the sizes of table names in the JDBC configuration store
diff --git a/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java b/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
index 00d75c8..4951b71 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
@@ -173,7 +173,7 @@
org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name())));
}
- private Map<String,Object> map(Object... vals)
+ protected Map<String,Object> map(Object... vals)
{
Map<String,Object> map = new HashMap<>();
boolean isValue = false;
@@ -205,11 +205,16 @@
verify(_handler, never()).handle(any(ConfiguredObjectRecord.class));
}
- private ConfiguredObjectRecord matchesRecord(UUID id, String type, Map<String, Object> attributes)
+ protected ConfiguredObjectRecord matchesRecord(UUID id, String type, Map<String, Object> attributes)
{
return argThat(new ConfiguredObjectMatcher(id, type, attributes, ANY_MAP));
}
+ protected ConfiguredObjectRecord matchesRecord(UUID id, String type, Map<String, Object> attributes, Map<String,UUID> parents)
+ {
+ return argThat(new ConfiguredObjectMatcher(id, type, attributes, parents));
+ }
+
private static class ConfiguredObjectMatcher extends ArgumentMatcher<ConfiguredObjectRecord>
{
private final Map<String,Object> _matchingMap;
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCConfigurationStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCConfigurationStore.java
index c87d719..1b57755 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCConfigurationStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCConfigurationStore.java
@@ -25,6 +25,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -58,8 +59,12 @@
public abstract class AbstractJDBCConfigurationStore implements MessageStoreProvider, DurableConfigurationStore
{
private final static String CONFIGURATION_VERSION_TABLE_NAME_SUFFIX = "QPID_CONFIG_VERSION";
- private final static String CONFIGURED_OBJECTS_TABLE_NAME_SUFFIX = "QPID_CONFIGURED_OBJECTS";
- private final static String CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME_SUFFIX = "QPID_CONFIGURED_OBJECT_HIERARCHY";
+ private final static String VERSION_1_CONFIGURED_OBJECTS_TABLE_NAME_SUFFIX = "QPID_CONFIGURED_OBJECTS";
+ private final static String VERSION_1_CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME_SUFFIX = "QPID_CONFIGURED_OBJECT_HIERARCHY";
+ private final static String CONFIGURATION_STRUCTURE_VERSION_TABLE_NAME_SUFFIX = "QPID_CFG_VERSION";
+ private final static String CONFIGURED_OBJECTS_TABLE_NAME_SUFFIX = "QPID_CFG_OBJECTS";
+ private final static String CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME_SUFFIX = "QPID_CFG_HIERARCHY";
+ private static final int CONFIG_DB_VERSION = 2;
private static final int DEFAULT_CONFIG_VERSION = 0;
private final Set<Action<Connection>> _deleteActions = Collections.newSetFromMap(new ConcurrentHashMap<>());;
@@ -153,6 +158,21 @@
return _tableNamePrefix + CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME_SUFFIX;
}
+ private String getConfigStructureVersionTableName()
+ {
+ return _tableNamePrefix + CONFIGURATION_STRUCTURE_VERSION_TABLE_NAME_SUFFIX;
+ }
+
+ private String getVersion1ConfiguredObjectsTableName()
+ {
+ return _tableNamePrefix + VERSION_1_CONFIGURED_OBJECTS_TABLE_NAME_SUFFIX;
+ }
+
+ private String getVersion1ConfiguredObjectHierarchyTableName()
+ {
+ return _tableNamePrefix + VERSION_1_CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME_SUFFIX;
+ }
+
private Collection<ConfiguredObjectRecordImpl> doVisitAllConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws SQLException
{
Map<UUID, ConfiguredObjectRecordImpl> configuredObjects = new HashMap<UUID, ConfiguredObjectRecordImpl>();
@@ -246,6 +266,31 @@
+ configVersion);
}
}
+ else
+ {
+ try (PreparedStatement statement = connection.prepareStatement(String.format("SELECT version FROM %s",
+ getConfigStructureVersionTableName())))
+ {
+ try (ResultSet rs = statement.executeQuery())
+ {
+ if (!rs.next())
+ {
+ throw new StoreException(getConfigStructureVersionTableName()
+ + " does not contain the configuration database version");
+ }
+ int version = rs.getInt(1);
+ switch (version)
+ {
+ case 1:
+ upgradeFromConfigVersion1();
+ case CONFIG_DB_VERSION:
+ return;
+ default:
+ throw new StoreException("Unknown configuration database version: " + version);
+ }
+ }
+ }
+ }
}
catch (SQLException se)
{
@@ -258,6 +303,117 @@
}
+ private void upgradeFromConfigVersion1() throws SQLException
+ {
+ Connection connection = newConnection();
+ try
+ {
+ try (PreparedStatement stmt = connection.prepareStatement(String.format(
+ "SELECT id, object_type, attributes FROM %s",
+ getVersion1ConfiguredObjectsTableName())))
+ {
+ try (ResultSet rs = stmt.executeQuery())
+ {
+ while (rs.next())
+ {
+ String id = rs.getString(1);
+ String objectType = rs.getString(2);
+ String attributes = getBlobAsString(rs, 3);
+
+ try (PreparedStatement insertStmt = connection.prepareStatement(String.format(
+ "INSERT INTO %s ( id, object_type, attributes) VALUES (?,?,?)",
+ getConfiguredObjectsTableName())))
+ {
+ insertStmt.setString(1, id);
+ insertStmt.setString(2, objectType);
+ if (attributes == null)
+ {
+ insertStmt.setNull(3, Types.BLOB);
+ }
+ else
+ {
+ byte[] attributesAsBytes = attributes.getBytes(StandardCharsets.UTF_8);
+
+ try(ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes))
+ {
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Unexpected exception: " + e.getMessage(), e);
+ }
+ }
+ insertStmt.execute();
+ }
+ }
+ }
+ }
+
+ try (PreparedStatement stmt = connection.prepareStatement(String.format(
+ "SELECT child_id, parent_type, parent_id FROM %s",
+ getVersion1ConfiguredObjectHierarchyTableName())))
+ {
+ try (ResultSet rs = stmt.executeQuery())
+ {
+ while (rs.next())
+ {
+ String childId = rs.getString(1);
+ String parentType = rs.getString(2);
+ String parentId = rs.getString(3);
+
+ try (PreparedStatement insertStmt = connection.prepareStatement(String.format(
+ "INSERT INTO %s ( child_id, parent_type, parent_id) VALUES (?,?,?)",
+ getConfiguredObjectHierarchyTableName())))
+ {
+ insertStmt.setString(1, childId);
+ insertStmt.setString(2, parentType);
+ insertStmt.setString(3, parentId);
+
+ insertStmt.execute();
+ }
+ }
+ }
+ }
+
+ updateConfigStructureVersionTableName(connection, 2);
+ connection.commit();
+
+ }
+ catch(SQLException | RuntimeException e)
+ {
+ try
+ {
+ connection.rollback();
+ }
+ catch(SQLException re)
+ {
+ }
+ throw e;
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ try(Connection c = newAutoCommitConnection())
+ {
+ JdbcUtils.dropTables(c,
+ getLogger(),
+ Arrays.asList(getVersion1ConfiguredObjectHierarchyTableName(),
+ getVersion1ConfiguredObjectsTableName()));
+ }
+ }
+
+ private void updateConfigStructureVersionTableName(Connection conn, int newVersion) throws SQLException
+ {
+ try (PreparedStatement statement = conn.prepareStatement("UPDATE " + getConfigStructureVersionTableName()
+ + " SET version = ?"))
+ {
+ statement.setInt(1, newVersion);
+ statement.execute();
+ }
+ }
+
private void upgradeFromV7(ConfiguredObject<?> parent) throws SQLException
{
@SuppressWarnings("serial")
@@ -446,6 +602,7 @@
createConfiguredObjectsTable(conn);
createConfiguredObjectHierarchyTable(conn);
+ createConfigurationStructureVersionTable(conn);
}
catch (SQLException e)
{
@@ -457,18 +614,38 @@
}
}
+ private void createConfigurationStructureVersionTable(final Connection conn) throws SQLException
+ {
+ if (!tableExists(getConfigStructureVersionTableName(), conn))
+ {
+ try (Statement stmt = conn.createStatement())
+ {
+ stmt.execute(String.format("CREATE TABLE %s ( version int not null )",
+ getConfigStructureVersionTableName()));
+ }
+
+ int version = tableExists(getVersion1ConfiguredObjectsTableName(), conn) ? 1 : CONFIG_DB_VERSION;
+ try (PreparedStatement pstmt = conn.prepareStatement(String.format("INSERT INTO %s ( version ) VALUES ( ? )",
+ getConfigStructureVersionTableName())))
+ {
+ pstmt.setInt(1, version);
+ pstmt.execute();
+ }
+ }
+ }
+
private void dropConfigVersionTable(final Connection conn) throws SQLException
{
- if(!tableExists(getConfigurationVersionTableName(), conn))
+ dropTable(conn, getConfigurationVersionTableName());
+ }
+
+ private void dropTable(final Connection conn, final String tableName) throws SQLException
+ {
+ if(!tableExists(tableName, conn))
{
- Statement stmt = conn.createStatement();
- try
+ try (Statement stmt = conn.createStatement())
{
- stmt.execute("DROP TABLE " + getConfigurationVersionTableName());
- }
- finally
- {
- stmt.close();
+ stmt.execute(String.format("DROP TABLE %s", tableName));
}
}
}
@@ -848,7 +1025,8 @@
JdbcUtils.dropTables(conn,
getLogger(),
Arrays.asList(getConfiguredObjectsTableName(),
- getConfiguredObjectHierarchyTableName()));
+ getConfiguredObjectHierarchyTableName(),
+ getConfigStructureVersionTableName()));
}
}
diff --git a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStoreTest.java b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStoreTest.java
index b55acc3..2e7caf5 100644
--- a/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStoreTest.java
+++ b/broker-plugins/jdbc-store/src/test/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStoreTest.java
@@ -24,13 +24,21 @@
import static org.apache.qpid.server.store.jdbc.TestJdbcUtils.assertTablesExistence;
import static org.apache.qpid.server.store.jdbc.TestJdbcUtils.getTableNames;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
import java.sql.Connection;
+import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
@@ -40,6 +48,8 @@
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.store.AbstractDurableConfigurationStoreTestCase;
import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.virtualhostnode.jdbc.JDBCVirtualHostNode;
public class GenericJDBCConfigurationStoreTest extends AbstractDurableConfigurationStoreTestCase
@@ -66,7 +76,7 @@
@Test
public void testOnDelete() throws Exception
{
- try(Connection connection = openConnection())
+ try (Connection connection = openConnection())
{
GenericJDBCConfigurationStore store = (GenericJDBCConfigurationStore) getConfigurationStore();
Collection<String> expectedTables = Arrays.asList(store.getConfiguredObjectHierarchyTableName(),
@@ -91,6 +101,143 @@
assertEquals("Delete action was not invoked", true, deleted.get());
}
+ @Test
+ public void testUpgradeStoreStructure() throws Exception
+ {
+ GenericJDBCConfigurationStore store = (GenericJDBCConfigurationStore) getConfigurationStore();
+ store.closeConfigurationStore();
+ store.onDelete(getVirtualHostNode());
+
+ GenericJDBCConfigurationStore store2 = (GenericJDBCConfigurationStore) createConfigStore();
+
+ UUID hostId = UUID.randomUUID();
+ UUID queueId = UUID.randomUUID();
+ try (Connection connection = openConnection())
+ {
+ assertTablesExistence(Arrays.asList("QPID_CONFIGURED_OBJECTS", "QPID_CONFIGURED_OBJECT_HIERARCHY"),
+ getTableNames(connection), false);
+ try (Statement stmt = connection.createStatement())
+ {
+ stmt.execute("CREATE TABLE QPID_CONFIGURED_OBJECTS ( id VARCHAR(36) not null,"
+ + " object_type varchar(255), attributes blob, PRIMARY KEY (id))");
+ stmt.execute("CREATE TABLE QPID_CONFIGURED_OBJECT_HIERARCHY ( child_id VARCHAR(36) not null,"
+ + " parent_type varchar(255), parent_id VARCHAR(36), PRIMARY KEY (child_id, parent_type))");
+ }
+
+ try (PreparedStatement insertStmt = connection.prepareStatement(
+ "INSERT INTO QPID_CONFIGURED_OBJECTS ( id, object_type, attributes) VALUES (?,?,?)"))
+ {
+ insertStmt.setString(1, hostId.toString());
+ insertStmt.setString(2, "VirtualHost");
+ final byte[] attributesAsBytes = "{\"name\":\"testHost\"}".getBytes(StandardCharsets.UTF_8);
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes))
+ {
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+ }
+ insertStmt.execute();
+ }
+ try (PreparedStatement insertStmt = connection.prepareStatement(
+ "INSERT INTO QPID_CONFIGURED_OBJECTS ( id, object_type, attributes) VALUES (?,?,?)"))
+ {
+ insertStmt.setString(1, queueId.toString());
+ insertStmt.setString(2, "Queue");
+ final byte[] attributesAsBytes = "{\"name\":\"testQueue\"}".getBytes(StandardCharsets.UTF_8);
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes))
+ {
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+ }
+ insertStmt.execute();
+ }
+
+ try (PreparedStatement insertStmt = connection.prepareStatement(
+ "INSERT INTO QPID_CONFIGURED_OBJECT_HIERARCHY "
+ + " ( child_id, parent_type, parent_id) VALUES (?,?,?)"))
+ {
+ insertStmt.setString(1, queueId.toString());
+ insertStmt.setString(2, "VirtualHost");
+ insertStmt.setString(3, hostId.toString());
+
+ insertStmt.execute();
+ }
+ }
+
+ store2.init(getVirtualHostNode());
+ store2.upgradeStoreStructure();
+
+ try (Connection connection = openConnection())
+ {
+ try
+ {
+ assertTablesExistence(Arrays.asList("QPID_CONFIGURED_OBJECTS", "QPID_CONFIGURED_OBJECT_HIERARCHY"),
+ getTableNames(connection), false);
+
+ assertTablesExistence(Collections.singletonList("QPID_CFG_VERSION"), getTableNames(connection), true);
+ }
+ finally
+ {
+ JdbcUtils.dropTables(connection,
+ store2.getLogger(),
+ Arrays.asList("QPID_CONFIGURED_OBJECTS", "QPID_CONFIGURED_OBJECT_HIERARCHY"));
+ }
+ }
+
+ ConfiguredObjectRecordHandler handler = mock(ConfiguredObjectRecordHandler.class);
+ store2.openConfigurationStore(handler);
+
+ verify(handler).handle(matchesRecord(hostId, "VirtualHost", map("name", "testHost")));
+ verify(handler).handle(matchesRecord(queueId,
+ "Queue",
+ map("name", "testQueue"),
+ Collections.singletonMap("VirtualHost", hostId)));
+ }
+
+ @Test
+ public void testUpgradeStoreStructureFromUnknownVersion() throws Exception
+ {
+ GenericJDBCConfigurationStore store = (GenericJDBCConfigurationStore) getConfigurationStore();
+ store.closeConfigurationStore();
+ store.onDelete(getVirtualHostNode());
+
+ GenericJDBCConfigurationStore store2 = (GenericJDBCConfigurationStore) createConfigStore();
+
+ try (Connection connection = openConnection())
+ {
+ assertTablesExistence(Collections.singletonList("QPID_CFG_VERSION"),
+ getTableNames(connection), false);
+ try (Statement stmt = connection.createStatement())
+ {
+ stmt.execute("CREATE TABLE QPID_CFG_VERSION ( version int not null )");
+ }
+ try (PreparedStatement insertStmt = connection.prepareStatement(
+ "INSERT INTO QPID_CFG_VERSION ( version) VALUES (?)"))
+ {
+ insertStmt.setInt(1, 0);
+ insertStmt.execute();
+ }
+ }
+
+ store2.init(getVirtualHostNode());
+
+ try
+ {
+ store2.upgradeStoreStructure();
+ fail("Exception is expected");
+ }
+ catch (StoreException e)
+ {
+ // pass
+ }
+ finally
+ {
+ try (Connection connection = openConnection())
+ {
+ JdbcUtils.dropTables(connection,
+ store2.getLogger(),
+ Arrays.asList("QPID_CFG_VERSION"));
+ }
+ }
+ }
+
@Override
protected VirtualHostNode createVirtualHostNode(final String storeLocation, final ConfiguredObjectFactory factory)
{